Skip to content

Commit

Permalink
add average sessions duration
Browse files Browse the repository at this point in the history
  • Loading branch information
“Giems” committed Feb 22, 2024
1 parent cf93a57 commit 42b6c83
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
----------------- Hourly requests per app -----------------
--- View
CREATE MATERIALIZED VIEW hourly_requests_per_app
WITH (timescaledb.continuous)
AS SELECT
Expand All @@ -9,12 +10,17 @@ FROM requests
GROUP BY app_id, hourly_bucket
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('hourly_requests_per_app',
start_offset => INTERVAL '3 h',
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '1 h');

--- Real time aggregation
ALTER MATERIALIZED VIEW hourly_requests_per_app set (timescaledb.materialized_only = false);

----------------- Daily requests per app -----------------
--- View
CREATE MATERIALIZED VIEW daily_requests_per_app
WITH (timescaledb.continuous) AS
SELECT
Expand All @@ -25,12 +31,17 @@ FROM hourly_requests_per_app
GROUP BY app_id, daily_bucket
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('daily_requests_per_app',
start_offset => INTERVAL '3 d',
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '12 h');

--- Real time aggregation
ALTER MATERIALIZED VIEW daily_requests_per_app set (timescaledb.materialized_only = false);

----------------- Monthly requests per app -----------------
--- View
CREATE MATERIALIZED VIEW monthly_requests_per_app
WITH (timescaledb.continuous) AS
SELECT
Expand All @@ -41,9 +52,12 @@ FROM daily_requests_per_app
GROUP BY app_id, monthly_bucket
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('monthly_requests_per_app',
start_offset => INTERVAL '3 month',
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '1 month');

--- Real time aggregation
ALTER MATERIALIZED VIEW monthly_requests_per_app set (timescaledb.materialized_only = false);

47 changes: 47 additions & 0 deletions database/migrations/000000000008_session_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
----------------- Daily Average session duration -----------------
--- View
CREATE MATERIALIZED VIEW avg_session_duration_per_app_daily
WITH (timescaledb.continuous) AS
SELECT
app_id,
time_bucket('1 day'::interval, session_open_timestamp) AS daily_bucket,
stats_agg(EXTRACT(EPOCH FROM (session_close_timestamp - session_open_timestamp))) AS daily_session_stats
FROM
sessions
WHERE
session_close_timestamp IS NOT NULL
GROUP BY
app_id, daily_bucket
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('avg_session_duration_per_app_daily',
start_offset => INTERVAL '14 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');

--- Real time aggregation
ALTER MATERIALIZED VIEW avg_session_duration_per_app_daily set (timescaledb.materialized_only = false);

----------------- Monthly Average session duration -----------------
--- View
CREATE MATERIALIZED VIEW avg_session_duration_per_app_monthly
WITH (timescaledb.continuous) AS
SELECT
app_id,
time_bucket('1 month'::interval, daily_bucket) AS monthly_bucket,
average(rollup(daily_session_stats)) AS avg_monthly_session_duration_seconds
FROM
avg_session_duration_per_app_daily
GROUP BY
app_id, monthly_bucket
WITH NO DATA;

--- Refresh policy
SELECT add_continuous_aggregate_policy('avg_session_duration_per_app_monthly',
start_offset => INTERVAL '3 months',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day');

--- Real time aggregation
ALTER MATERIALIZED VIEW avg_session_duration_per_app_monthly set (timescaledb.materialized_only = false);
7 changes: 7 additions & 0 deletions database/src/structs/filter_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ pub struct AggregatedRequestCount {
pub bucket: DateTime<Utc>,
pub request_count: i64,
}

#[derive(Debug, sqlx::FromRow)]
pub struct SessionDurationAverage {
pub app_id: String,
pub bucket: DateTime<Utc>,
pub average_duration_seconds: f64,
}
18 changes: 17 additions & 1 deletion database/src/tables/registered_app/select.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::table_struct::{RegisteredApp, REGISTERED_APPS_TABLE_NAME};
use crate::structs::filter_requests::AggregatedRequestCount;
use crate::structs::filter_requests::{AggregatedRequestCount, SessionDurationAverage};
use crate::structs::time_filters::TimeFilter;
use crate::tables::requests::table_struct::REQUESTS_TABLE_NAME;
use crate::{db::Db, tables::requests::table_struct::Request};
Expand Down Expand Up @@ -75,4 +75,20 @@ impl Db {
.fetch_all(&self.connection_pool)
.await
}

pub async fn get_monthly_session_duration_average(
&self,
app_id: &str,
) -> Result<Vec<SessionDurationAverage>, Error> {
let query = format!(
"SELECT app_id, monthly_bucket as bucket, avg_monthly_session_duration_seconds as average_duration_seconds
FROM avg_session_duration_per_app_monthly
WHERE app_id = $1"
);

sqlx::query_as::<_, SessionDurationAverage>(&query)
.bind(app_id)
.fetch_all(&self.connection_pool)
.await
}
}
94 changes: 92 additions & 2 deletions database/src/tables/registered_app/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod tests {
use tokio::task;

#[tokio::test]
async fn test_data_ranges() {
async fn test_requests_count() {
let db = super::Db::connect_to_the_pool().await;
db.truncate_all_tables().await.unwrap();

Expand Down Expand Up @@ -136,7 +136,14 @@ mod tests {
}

// We need to refresh manually the views
db_arc.refresh_continuous_aggregates().await.unwrap();
db_arc
.refresh_continuous_aggregates(vec![
"daily_requests_per_app".to_string(),
"hourly_requests_per_app".to_string(),
"monthly_requests_per_app".to_string(),
])
.await
.unwrap();

let result = db_arc
.get_aggregated_requests_by_app_id(&app.app_id, TimeFilter::Last24Hours)
Expand Down Expand Up @@ -165,4 +172,87 @@ mod tests {
assert_eq!(result[0].request_count, 100);
assert_eq!(result[30].request_count, 70);
}

#[tokio::test]
async fn test_average_session_duration() {
let db = super::Db::connect_to_the_pool().await;
db.truncate_all_tables().await.unwrap();

// "Register" an app
let app = RegisteredApp {
app_id: "test_app_id".to_string(),
app_name: "test_app_name".to_string(),
whitelisted_domains: vec!["test_domain".to_string()],
subscription: None,
ack_public_keys: vec!["test_key".to_string()],
email: None,
registration_timestamp: 0,
pass_hash: None,
};

db.register_new_app(&app).await.unwrap();

let result = db.get_registered_app_by_app_id(&app.app_id).await.unwrap();
assert_eq!(app, result);

// Create sessions
let now = Utc::now();
let sessions = vec![
(
now - Duration::from_secs(4 * 60 * 60),
now - Duration::from_secs((4 * 60 - 85) * 60),
), // 1 hour 25 minutes session, 4 hours ago
(
now - Duration::from_secs((2 * 60 + 35) * 60),
now - Duration::from_secs((2 * 60 + 10) * 60),
), // 25 minutes session, 2 hours and 35 minutes ago
(
now - Duration::from_secs((1 * 60 + 50) * 60),
now - Duration::from_secs((1 * 60 + 40) * 60),
), // 10 minutes session, 1 hour and 50 minutes ago
];

for (start, end) in sessions.iter() {
let session = DbNcSession {
session_id: format!("session_id_{}", start.timestamp()),
app_id: "test_app_id".to_string(),
app_metadata: "test_app_metadata".to_string(),
app_ip_address: "test_app_ip_address".to_string(),
persistent: false,
network: "test_network".to_string(),
client: None,
session_open_timestamp: *start,
session_close_timestamp: None,
};

db.save_new_session(&session).await.unwrap();
db.close_session(&session.session_id, *end).await.unwrap();
}

// We need to refresh manually the views
db.refresh_continuous_aggregates(vec![
"avg_session_duration_per_app_monthly".to_string(),
"avg_session_duration_per_app_daily".to_string(),
])
.await
.unwrap();

let result = db
.get_monthly_session_duration_average(&app.app_id)
.await
.unwrap();

assert_eq!(result.len(), 1);

let expected_avg_duration_seconds: f64 = sessions
.iter()
.map(|(start, end)| (end.timestamp() - start.timestamp()) as f64)
.sum::<f64>()
/ sessions.len() as f64;

assert_eq!(
result[0].average_duration_seconds,
expected_avg_duration_seconds
);
}
}
10 changes: 6 additions & 4 deletions database/src/tables/sessions/update.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::table_struct::{DbNcSession, SESSIONS_KEYS, SESSIONS_TABLE_NAME};
use crate::{db::Db, tables::utils::get_date_time};
use crate::db::Db;
use sqlx::{
query,
types::chrono::{DateTime, Utc},
Expand Down Expand Up @@ -50,14 +50,14 @@ impl Db {
pub async fn close_session(
&self,
session_id: &String,
close_timestamp: u64,
close_timestamp: DateTime<Utc>,
) -> Result<(), sqlx::Error> {
let query_body = format!(
"UPDATE {SESSIONS_TABLE_NAME} SET session_close_timestamp = $1 WHERE session_id = $2"
);

let query_result = query(&query_body)
.bind(get_date_time(close_timestamp))
.bind(close_timestamp)
.bind(session_id)
.execute(&self.connection_pool)
.await;
Expand Down Expand Up @@ -134,7 +134,9 @@ mod tests {
assert_eq!(session, session);

// Change the session status to closed
db.close_session(&session.session_id, 15).await.unwrap();
db.close_session(&session.session_id, get_date_time(15).unwrap())
.await
.unwrap();

// Get session by session_id to check if the session status is closed
let session = db
Expand Down
27 changes: 14 additions & 13 deletions database/src/tables/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ pub mod test_utils {
Ok(())
}

pub async fn refresh_continuous_aggregates(&self) -> Result<(), sqlx::Error> {
// Refresh the hourly_requests_per_app view
let _ = sqlx::query(
"CALL refresh_continuous_aggregate('hourly_requests_per_app', NULL, NULL)",
)
.execute(&self.connection_pool)
.await?;
pub async fn refresh_continuous_aggregates(
&self,
views: Vec<String>,
) -> Result<(), sqlx::Error> {
// Refresh views
for view in views.iter() {
let _ = sqlx::query(&format!(
"CALL refresh_continuous_aggregate('{view}', NULL, NULL)",
view = view
))
.execute(&self.connection_pool)
.await?;
}

// Refresh the daily_requests_per_app view
let _ = sqlx::query(
"CALL refresh_continuous_aggregate('daily_requests_per_app', NULL, NULL)",
)
.execute(&self.connection_pool)
.await?;
println!("Refreshed {} continuous aggregates", views.len());

Ok(())
}
Expand Down

0 comments on commit 42b6c83

Please sign in to comment.