Skip to content

Commit

Permalink
add "average" opened sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
“Giems” committed Feb 22, 2024
1 parent 42b6c83 commit 4cafaea
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 23 deletions.
25 changes: 15 additions & 10 deletions database/migrations/000000000008_session_stats.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
----------------- Daily Average session duration -----------------
----------------- Sessions Stats -----------------
--- View
CREATE MATERIALIZED VIEW avg_session_duration_per_app_daily
CREATE MATERIALIZED VIEW sessions_stats_per_app_daily
WITH (timescaledb.continuous) AS
SELECT
app_id,
time_bucket('1 day'::interval, session_open_timestamp) AS daily_bucket,
stats_agg(EXTRACT(EPOCH FROM (session_close_timestamp - session_open_timestamp))) AS daily_session_stats
COUNT(*) AS num_opened_sessions,
stats_agg(EXTRACT(EPOCH FROM (session_close_timestamp - session_open_timestamp))) AS avg_daily_session_duration_seconds
FROM
sessions
WHERE
Expand All @@ -15,33 +16,37 @@ GROUP BY
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('avg_session_duration_per_app_daily',
SELECT add_continuous_aggregate_policy('sessions_stats_per_app_daily',
start_offset => INTERVAL '14 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');

--- Real time aggregation
ALTER MATERIALIZED VIEW avg_session_duration_per_app_daily set (timescaledb.materialized_only = false);
ALTER MATERIALIZED VIEW sessions_stats_per_app_daily SET (timescaledb.materialized_only = false);



----------------- Monthly Average session duration -----------------
--- View
CREATE MATERIALIZED VIEW avg_session_duration_per_app_monthly
CREATE MATERIALIZED VIEW sessions_stats_per_app_monthly
WITH (timescaledb.continuous) AS
SELECT
app_id,
time_bucket('1 month'::interval, daily_bucket) AS monthly_bucket,
average(rollup(daily_session_stats)) AS avg_monthly_session_duration_seconds
SUM(num_opened_sessions)::BIGINT AS total_opened_sessions_monthly,
average(rollup(avg_daily_session_duration_seconds)) AS avg_monthly_session_duration_seconds,
SUM(num_opened_sessions)::BIGINT / COUNT(DISTINCT daily_bucket) AS avg_daily_opened_sessions
FROM
avg_session_duration_per_app_daily
sessions_stats_per_app_daily
GROUP BY
app_id, monthly_bucket
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('avg_session_duration_per_app_monthly',
SELECT add_continuous_aggregate_policy('sessions_stats_per_app_monthly',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

--- Real time aggregation
ALTER MATERIALIZED VIEW avg_session_duration_per_app_monthly set (timescaledb.materialized_only = false);
ALTER MATERIALIZED VIEW sessions_stats_per_app_monthly SET (timescaledb.materialized_only = false);
4 changes: 3 additions & 1 deletion database/src/structs/filter_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ pub struct AggregatedRequestCount {
}

#[derive(Debug, sqlx::FromRow)]
pub struct SessionDurationAverage {
pub struct SessionsStats {
pub app_id: String,
pub bucket: DateTime<Utc>,
pub sessions_opened: i64,
pub average_duration_seconds: f64,
pub avg_daily_opened_sessions: i64,
}
17 changes: 11 additions & 6 deletions database/src/tables/registered_app/select.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::table_struct::{RegisteredApp, REGISTERED_APPS_TABLE_NAME};
use crate::structs::filter_requests::{AggregatedRequestCount, SessionDurationAverage};
use crate::structs::filter_requests::{AggregatedRequestCount, SessionsStats};
use crate::structs::time_filters::TimeFilter;
use crate::tables::requests::table_struct::REQUESTS_TABLE_NAME;
use crate::{db::Db, tables::requests::table_struct::Request};
Expand Down Expand Up @@ -76,17 +76,22 @@ impl Db {
.await
}

pub async fn get_monthly_session_duration_average(
pub async fn get_monthly_sessions_stats(
&self,
app_id: &str,
) -> Result<Vec<SessionDurationAverage>, Error> {
) -> Result<Vec<SessionsStats>, Error> {
let query = format!(
"SELECT app_id, monthly_bucket as bucket, avg_monthly_session_duration_seconds as average_duration_seconds
FROM avg_session_duration_per_app_monthly
"SELECT
app_id,
monthly_bucket as bucket,
total_opened_sessions_monthly as sessions_opened,
avg_monthly_session_duration_seconds as average_duration_seconds,
avg_daily_opened_sessions
FROM sessions_stats_per_app_monthly
WHERE app_id = $1"
);

sqlx::query_as::<_, SessionDurationAverage>(&query)
sqlx::query_as::<_, SessionsStats>(&query)
.bind(app_id)
.fetch_all(&self.connection_pool)
.await
Expand Down
167 changes: 161 additions & 6 deletions database/src/tables/registered_app/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,13 @@ mod tests {

// We need to refresh manually the views
db.refresh_continuous_aggregates(vec![
"avg_session_duration_per_app_monthly".to_string(),
"avg_session_duration_per_app_daily".to_string(),
"sessions_stats_per_app_monthly".to_string(),
"sessions_stats_per_app_daily".to_string(),
])
.await
.unwrap();

let result = db
.get_monthly_session_duration_average(&app.app_id)
.await
.unwrap();
let result = db.get_monthly_sessions_stats(&app.app_id).await.unwrap();

assert_eq!(result.len(), 1);

Expand All @@ -255,4 +252,162 @@ mod tests {
expected_avg_duration_seconds
);
}

#[tokio::test]
async fn test_sessions_count() {
let db = super::Db::connect_to_the_pool().await;
db.truncate_all_tables().await.unwrap();

// "Register" an app
let app = RegisteredApp {
app_id: "test_app_id".to_string(),
app_name: "test_app_name".to_string(),
whitelisted_domains: vec!["test_domain".to_string()],
subscription: None,
ack_public_keys: vec!["test_key".to_string()],
email: None,
registration_timestamp: 0,
pass_hash: None,
};

db.register_new_app(&app).await.unwrap();

let result = db.get_registered_app_by_app_id(&app.app_id).await.unwrap();
assert_eq!(app, result);

// Number of sessions to create
let num_sessions: u64 = 100;
let now = Utc::now();
let start_of_period = now - Duration::from_secs(60 * 60 * 24 * 14); // 14 days

// Generate and save sessions
for i in 0..num_sessions {
let session_start =
start_of_period + Duration::from_secs(i * 86400 / num_sessions as u64); // spread sessions evenly over 14 days
let session_end = session_start + Duration::from_secs(60 * 30); // duration of 30 minutes for each session

let session = DbNcSession {
session_id: format!("session_{}_{}", app.app_id, i),
app_id: app.app_id.clone(),
app_metadata: "test_metadata".to_string(),
app_ip_address: "127.0.0.1".to_string(),
persistent: false,
network: "test_network".to_string(),
client: None,
session_open_timestamp: session_start,
session_close_timestamp: None,
};

db.save_new_session(&session).await.unwrap();
db.close_session(&session.session_id, session_end)
.await
.unwrap();
}

// Manually refresh the continuous aggregates
db.refresh_continuous_aggregates(vec![
"sessions_stats_per_app_monthly".to_string(),
"sessions_stats_per_app_daily".to_string(),
])
.await
.unwrap();

let stats = db.get_monthly_sessions_stats(&app.app_id).await.unwrap();

assert_eq!(stats.len(), 1);
assert_eq!(stats[0].sessions_opened, num_sessions as i64);
}

#[tokio::test]
async fn test_sessions_average_daily_opened_sessions() {
let db = super::Db::connect_to_the_pool().await;
db.truncate_all_tables().await.unwrap();

// "Register" an app
let app = RegisteredApp {
app_id: "test_app_id".to_string(),
app_name: "test_app_name".to_string(),
whitelisted_domains: vec!["test_domain".to_string()],
subscription: None,
ack_public_keys: vec!["test_key".to_string()],
email: None,
registration_timestamp: 0,
pass_hash: None,
};

db.register_new_app(&app).await.unwrap();

let result = db.get_registered_app_by_app_id(&app.app_id).await.unwrap();
assert_eq!(app, result);

let now = Utc::now();
let start_of_first_period = now - Duration::from_secs(60 * 60 * 24 * 60); // Start of first period, 60 days ago
let start_of_second_period = now - Duration::from_secs(60 * 60 * 24 * 30); // Start of second period, 30 days ago
let num_sessions_first_period: u64 = 40;
let num_sessions_second_period: u64 = 28;

// Generate and save sessions for the first period
for i in 0..num_sessions_first_period {
let session_start = start_of_first_period
+ Duration::from_secs(i * 86400 / num_sessions_first_period as u64);
let session_end = session_start + Duration::from_secs(60 * 30); // Duration of 30 minutes for each session

let session = DbNcSession {
session_id: format!("session_{}_{}", app.app_id, i),
app_id: app.app_id.clone(),
app_metadata: "test_metadata".to_string(),
app_ip_address: "127.0.0.1".to_string(),
persistent: false,
network: "test_network".to_string(),
client: None,
session_open_timestamp: session_start,
session_close_timestamp: Some(session_end),
};

db.save_new_session(&session).await.unwrap();
db.close_session(&session.session_id, session_end)
.await
.unwrap();
}

// Generate and save sessions for the second period
for i in 0..num_sessions_second_period {
let session_start = start_of_second_period
+ Duration::from_secs(i * 86400 / num_sessions_second_period as u64);
let session_end = session_start + Duration::from_secs(60 * 30); // Duration of 30 minutes for each session

let session = DbNcSession {
session_id: format!("session_{}_{}_2nd", app.app_id, i), // Ensure unique session IDs for the second period
app_id: app.app_id.clone(),
app_metadata: "test_metadata".to_string(),
app_ip_address: "127.0.0.1".to_string(),
persistent: false,
network: "test_network".to_string(),
client: None,
session_open_timestamp: session_start,
session_close_timestamp: Some(session_end),
};

db.save_new_session(&session).await.unwrap();
db.close_session(&session.session_id, session_end)
.await
.unwrap();
}

// Manually refresh the continuous aggregates
db.refresh_continuous_aggregates(vec![
"sessions_stats_per_app_daily".to_string(),
"sessions_stats_per_app_monthly".to_string(),
])
.await
.unwrap();

let stats = db.get_monthly_sessions_stats(&app.app_id).await.unwrap();

// assert_eq!(stats.len(), 2);
println!("Stats LEN: {:?}", stats.len());
for stat in stats {
println!("Stat: {:?}", stat);
}
}
}

0 comments on commit 4cafaea

Please sign in to comment.