From d6ab9dbbd7c7180b13b82af4f0d6064f44d51a5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGiems=E2=80=9D?= <“hubert.wabia@gmail.com”> Date: Wed, 21 Feb 2024 15:18:29 +0100 Subject: [PATCH 1/5] update migrations --- database/migrations/000000000003_sessions.sql | 18 ++----- database/migrations/000000000005_requests.sql | 15 ++---- .../000000000006_create_hypertables.sql | 2 + .../migrations/000000000007_app_queries.sql | 49 +++++++++++++++++++ 4 files changed, 60 insertions(+), 24 deletions(-) create mode 100644 database/migrations/000000000006_create_hypertables.sql create mode 100644 database/migrations/000000000007_app_queries.sql diff --git a/database/migrations/000000000003_sessions.sql b/database/migrations/000000000003_sessions.sql index 72afc282..4cd9d9e3 100644 --- a/database/migrations/000000000003_sessions.sql +++ b/database/migrations/000000000003_sessions.sql @@ -1,6 +1,6 @@ CREATE TABLE sessions( - session_id TEXT NOT NULL UNIQUE, - app_id TEXT NOT NULL, + session_id TEXT NOT NULL, + app_id TEXT NOT NULL REFERENCES registered_apps(app_id), app_metadata TEXT NOT NULL, app_ip_address TEXT NOT NULL, persistent BOOLEAN NOT NULL, @@ -9,15 +9,7 @@ CREATE TABLE sessions( client_device TEXT, client_metadata TEXT, client_notification_endpoint TEXT, - client_connected_at BIGINT, - session_open_timestamp BIGINT NOT NULL, - session_close_timestamp BIGINT + client_connected_at TIMESTAMPTZ, + session_open_timestamp TIMESTAMPTZ NOT NULL, + session_close_timestamp TIMESTAMPTZ ); - -CREATE UNIQUE INDEX sessions_session_id ON sessions(session_id); - -ALTER TABLE sessions -ADD CONSTRAINT fk_sessions_registered_apps -FOREIGN KEY (app_id) -REFERENCES registered_apps (app_id) -ON DELETE CASCADE; \ No newline at end of file diff --git a/database/migrations/000000000005_requests.sql b/database/migrations/000000000005_requests.sql index 380f10b7..bcfdf6ab 100644 --- a/database/migrations/000000000005_requests.sql +++ b/database/migrations/000000000005_requests.sql @@ -1,16 +1,9 @@ CREATE TABLE requests( - request_id TEXT NOT NULL UNIQUE, - request_type TEXT NOT NULL, + request_id TEXT NOT NULL, session_id TEXT NOT NULL, + app_id TEXT NOT NULL, + request_type TEXT NOT NULL, request_status request_status_enum NOT NULL, network TEXT NOT NULL, - creation_timestamp BIGINT NOT NULL + creation_timestamp TIMESTAMPTZ NOT NULL ); - -CREATE UNIQUE INDEX requests_request_id ON requests(request_id); - -ALTER TABLE requests -ADD CONSTRAINT fk_requests_sessions -FOREIGN KEY (session_id) -REFERENCES sessions (session_id) -ON DELETE CASCADE; \ No newline at end of file diff --git a/database/migrations/000000000006_create_hypertables.sql b/database/migrations/000000000006_create_hypertables.sql new file mode 100644 index 00000000..3bd7bacf --- /dev/null +++ b/database/migrations/000000000006_create_hypertables.sql @@ -0,0 +1,2 @@ +SELECT create_hypertable('sessions', 'session_open_timestamp'); +SELECT create_hypertable('requests', 'creation_timestamp'); diff --git a/database/migrations/000000000007_app_queries.sql b/database/migrations/000000000007_app_queries.sql new file mode 100644 index 00000000..2c5332c8 --- /dev/null +++ b/database/migrations/000000000007_app_queries.sql @@ -0,0 +1,49 @@ +----------------- Hourly requests per app ----------------- +CREATE MATERIALIZED VIEW hourly_requests_per_app +WITH (timescaledb.continuous) +AS SELECT + app_id, + time_bucket('1 h'::interval, creation_timestamp) as bucket, + COUNT(*) AS request_count +FROM requests +GROUP BY app_id, bucket +WITH NO DATA; + +SELECT add_continuous_aggregate_policy('hourly_requests_per_app', + start_offset => INTERVAL '3 h', + end_offset => INTERVAL '1 h', + schedule_interval => INTERVAL '1 h'); + +----------------- Daily requests per app ----------------- +CREATE MATERIALIZED VIEW daily_requests_per_app +WITH (timescaledb.continuous) AS +SELECT + app_id, + time_bucket('1 d'::interval, bucket) AS daily_bucket, + SUM(request_count)::BIGINT AS daily_request_count +FROM hourly_requests_per_app +GROUP BY app_id, daily_bucket +WITH NO DATA; + +SELECT add_continuous_aggregate_policy('daily_requests_per_app', + start_offset => INTERVAL '3 d', + end_offset => INTERVAL '1 h', + schedule_interval => INTERVAL '12 h'); + +----------------- Monthly requests per app ----------------- +CREATE MATERIALIZED VIEW monthly_requests_per_app +WITH (timescaledb.continuous) AS +SELECT + app_id, + time_bucket('1 month'::interval, daily_bucket) AS monthly_bucket, + SUM(daily_request_count)::BIGINT AS monthly_request_count +FROM daily_requests_per_app +GROUP BY app_id, monthly_bucket +WITH NO DATA; + +SELECT add_continuous_aggregate_policy('monthly_requests_per_app', + start_offset => INTERVAL '3 month', + end_offset => INTERVAL '1 h', + schedule_interval => INTERVAL '1 month'); + + From 3954e8b7069172210959282dd08a764a05a7c7fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGiems=E2=80=9D?= <“hubert.wabia@gmail.com”> Date: Wed, 21 Feb 2024 15:18:55 +0100 Subject: [PATCH 2/5] change db to TimescaleDb --- Cargo.toml | 2 +- infra/docker-compose.yaml | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0e7db1e7..cebd79bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,4 +28,4 @@ reqwest = "0.11.24" tokio = { version = "1.35.1", features = ["full"] } async-trait = "0.1.77" -sqlx = { version = "0.7.3", features = [ "runtime-tokio", "tls-rustls", "macros", "postgres", "time"] } \ No newline at end of file +sqlx = { version = "0.7.3", features = [ "runtime-tokio", "tls-rustls", "macros", "postgres", "chrono"] } \ No newline at end of file diff --git a/infra/docker-compose.yaml b/infra/docker-compose.yaml index a8f40b52..e3fb4940 100644 --- a/infra/docker-compose.yaml +++ b/infra/docker-compose.yaml @@ -1,8 +1,8 @@ version: '3.9' services: - postgres: - image: postgres:16 + timescaledb: + image: timescale/timescaledb-ha:pg16 ports: - 5432:5432 volumes: @@ -13,3 +13,4 @@ services: - POSTGRES_USER - POSTGRES_PASSWORD - POSTGRES_DB + - TIMESCALEDB_TELEMETRY=off From 7afc0b4c8a522787895f1ef86f847b0169dc286a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGiems=E2=80=9D?= <“hubert.wabia@gmail.com”> Date: Wed, 21 Feb 2024 15:40:54 +0100 Subject: [PATCH 3/5] db update --- database/src/structs/client_data.rs | 4 ++- database/src/structs/consts.rs | 4 +-- database/src/structs/filter_requests.rs | 8 +++++ database/src/structs/mod.rs | 2 ++ database/src/structs/time_filters.rs | 29 ++++++++++++++++ database/src/tables/requests/table_struct.rs | 16 +++++---- database/src/tables/requests/update.rs | 22 +++++++----- database/src/tables/sessions/table_struct.rs | 20 ++++++----- database/src/tables/sessions/update.rs | 35 ++++++++++++-------- database/src/tables/test_utils.rs | 25 ++++++++++++-- database/src/tables/utils.rs | 5 +++ 11 files changed, 126 insertions(+), 44 deletions(-) create mode 100644 database/src/structs/filter_requests.rs create mode 100644 database/src/structs/time_filters.rs diff --git a/database/src/structs/client_data.rs b/database/src/structs/client_data.rs index eab5f3d1..8467ddf2 100644 --- a/database/src/structs/client_data.rs +++ b/database/src/structs/client_data.rs @@ -1,8 +1,10 @@ +use sqlx::types::chrono::{DateTime, Utc}; + #[derive(Clone, Debug, Eq, PartialEq)] pub struct ClientData { pub client_id: Option, pub device: Option, pub metadata: Option, pub notification_endpoint: Option, - pub connected_at: u64, // Timestamp of when the client connected to the session + pub connected_at: DateTime, // Timestamp of when the client connected to the session } diff --git a/database/src/structs/consts.rs b/database/src/structs/consts.rs index 0c5e1fb0..74e068ed 100644 --- a/database/src/structs/consts.rs +++ b/database/src/structs/consts.rs @@ -1,3 +1 @@ -pub const LAST_24_HOURS: &str = "EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')::BIGINT * 1000"; -pub const LAST_7_DAYS: &str = "EXTRACT(EPOCH FROM NOW() - INTERVAL '7 days')::BIGINT * 1000"; -pub const LAST_30_DAYS: &str = "EXTRACT(EPOCH FROM NOW() - INTERVAL '30 days')::BIGINT * 1000"; +pub const DAY_IN_SECONDS: u64 = 60 * 60 * 24; // 86400 diff --git a/database/src/structs/filter_requests.rs b/database/src/structs/filter_requests.rs new file mode 100644 index 00000000..9a9c2840 --- /dev/null +++ b/database/src/structs/filter_requests.rs @@ -0,0 +1,8 @@ +use sqlx::types::chrono::{DateTime, Utc}; + +#[derive(Debug, sqlx::FromRow)] +pub struct AggregatedRequestCount { + pub app_id: String, + pub bucket: DateTime, + pub request_count: i64, +} diff --git a/database/src/structs/mod.rs b/database/src/structs/mod.rs index a9de028b..7a19a044 100644 --- a/database/src/structs/mod.rs +++ b/database/src/structs/mod.rs @@ -1,4 +1,6 @@ pub mod client_data; pub mod consts; +pub mod filter_requests; pub mod request_status; pub mod subscription; +pub mod time_filters; diff --git a/database/src/structs/time_filters.rs b/database/src/structs/time_filters.rs new file mode 100644 index 00000000..a62dc1bc --- /dev/null +++ b/database/src/structs/time_filters.rs @@ -0,0 +1,29 @@ +use super::consts::DAY_IN_SECONDS; +use sqlx::types::chrono::{NaiveDate, Utc}; +use std::time::Duration; + +pub enum TimeFilter { + Last24Hours, + Last7Days, + Last30Days, +} + +impl TimeFilter { + pub fn to_date(&self) -> NaiveDate { + let duration = match self { + TimeFilter::Last24Hours => Duration::from_secs(DAY_IN_SECONDS), + TimeFilter::Last7Days => Duration::from_secs(7 * DAY_IN_SECONDS), + TimeFilter::Last30Days => Duration::from_secs(30 * DAY_IN_SECONDS), + }; + // Subtract the duration from the current time and convert to NaiveDate + (Utc::now() - duration).date_naive() + } + + pub fn bucket_size(&self) -> &'static str { + match self { + TimeFilter::Last24Hours => "1 hour", + TimeFilter::Last7Days => "1 day", + TimeFilter::Last30Days => "1 day", + } + } +} diff --git a/database/src/tables/requests/table_struct.rs b/database/src/tables/requests/table_struct.rs index eb82e1aa..d369cfa8 100644 --- a/database/src/tables/requests/table_struct.rs +++ b/database/src/tables/requests/table_struct.rs @@ -1,31 +1,33 @@ -use sqlx::{postgres::PgRow, FromRow, Row}; - use crate::structs::request_status::RequestStatus; +use chrono::{DateTime, Utc}; +use sqlx::types::chrono; +use sqlx::{postgres::PgRow, FromRow, Row}; pub const REQUESTS_TABLE_NAME: &str = "requests"; pub const REQUESTS_KEYS: &str = - "request_id, request_type, session_id, request_status, network, creation_timestamp"; + "request_id, session_id, app_id, request_type, request_status, network, creation_timestamp"; #[derive(Clone, Debug, Eq, PartialEq)] pub struct Request { pub request_id: String, - pub request_type: String, pub session_id: String, + pub app_id: String, + pub request_type: String, pub request_status: RequestStatus, pub network: String, - pub creation_timestamp: u64, + pub creation_timestamp: DateTime, } impl FromRow<'_, PgRow> for Request { fn from_row(row: &sqlx::postgres::PgRow) -> std::result::Result { - let creation_timestamp: i64 = row.get("creation_timestamp"); Ok(Request { request_id: row.get("request_id"), + app_id: row.get("app_id"), request_type: row.get("request_type"), session_id: row.get("session_id"), request_status: row.get("request_status"), network: row.get("network"), - creation_timestamp: creation_timestamp as u64, + creation_timestamp: row.get("creation_timestamp"), }) } } diff --git a/database/src/tables/requests/update.rs b/database/src/tables/requests/update.rs index 1bfe57b1..ea93c86f 100644 --- a/database/src/tables/requests/update.rs +++ b/database/src/tables/requests/update.rs @@ -5,17 +5,18 @@ use sqlx::query; impl Db { pub async fn save_request(&self, request: &Request) -> Result<(), sqlx::Error> { let query_body = format!( - "INSERT INTO {REQUESTS_TABLE_NAME} ({}) VALUES ($1, $2, $3, $4, $5, $6)", + "INSERT INTO {REQUESTS_TABLE_NAME} ({}) VALUES ($1, $2, $3, $4, $5, $6, $7)", REQUESTS_KEYS ); let query_result = query(&query_body) .bind(&request.request_id) - .bind(&request.request_type) .bind(&request.session_id) + .bind(&request.app_id) + .bind(&request.request_type) .bind(&request.request_status) .bind(&request.network) - .bind(&(request.creation_timestamp as i64)) + .bind(&request.creation_timestamp) .execute(&self.connection_pool) .await; @@ -53,6 +54,7 @@ mod tests { structs::client_data::ClientData, tables::{ registered_app::table_struct::RegisteredApp, sessions::table_struct::DbNcSession, + utils::get_date_time, }, }; @@ -87,9 +89,9 @@ mod tests { device: Some("test_device".to_string()), metadata: Some("test_metadata".to_string()), notification_endpoint: Some("test_notification_endpoint".to_string()), - connected_at: 12, + connected_at: get_date_time(10).unwrap(), }), - session_open_timestamp: 10, + session_open_timestamp: get_date_time(10).unwrap(), session_close_timestamp: None, }; @@ -102,7 +104,8 @@ mod tests { session_id: "test_session_id".to_string(), request_status: RequestStatus::Pending, network: "test_network".to_string(), - creation_timestamp: 10, + app_id: "test_app_id".to_string(), + creation_timestamp: get_date_time(10).unwrap(), }; db.save_request(&request).await.unwrap(); @@ -119,8 +122,9 @@ mod tests { request_type: "test_request_type".to_string(), session_id: "test_session_id".to_string(), request_status: RequestStatus::Pending, + app_id: "test_app_id".to_string(), network: "test_network".to_string(), - creation_timestamp: 12, + creation_timestamp: get_date_time(10).unwrap(), }; db.save_request(&second_request).await.unwrap(); @@ -130,8 +134,8 @@ mod tests { .await .unwrap(); assert_eq!(requests.len(), 2); - assert_eq!(second_request, requests[0]); - assert_eq!(request, requests[1]); + assert_eq!(request, requests[0]); + assert_eq!(second_request, requests[1]); db.update_request_status(&request.request_id, &RequestStatus::Completed) .await diff --git a/database/src/tables/sessions/table_struct.rs b/database/src/tables/sessions/table_struct.rs index d1b436ca..72b5e8a2 100644 --- a/database/src/tables/sessions/table_struct.rs +++ b/database/src/tables/sessions/table_struct.rs @@ -1,5 +1,9 @@ use crate::structs::client_data::ClientData; -use sqlx::{postgres::PgRow, FromRow, Row}; +use sqlx::{ + postgres::PgRow, + types::chrono::{DateTime, Utc}, + FromRow, Row, +}; pub const SESSIONS_TABLE_NAME: &str = "sessions"; pub const SESSIONS_KEYS: &str = @@ -14,15 +18,13 @@ pub struct DbNcSession { pub persistent: bool, pub network: String, pub client: Option, // Some if user has ever connected to the session - pub session_open_timestamp: u64, - pub session_close_timestamp: Option, + pub session_open_timestamp: DateTime, + pub session_close_timestamp: Option>, } impl FromRow<'_, PgRow> for DbNcSession { fn from_row(row: &sqlx::postgres::PgRow) -> std::result::Result { - let session_open_timestamp: i64 = row.get("session_open_timestamp"); - let session_close_timestamp: Option = row.get("session_close_timestamp"); - let client_connected_at: Option = row.get("client_connected_at"); + let client_connected_at: Option> = row.get("client_connected_at"); Ok(DbNcSession { app_id: row.get("app_id"), app_metadata: row.get("app_metadata"), @@ -37,12 +39,12 @@ impl FromRow<'_, PgRow> for DbNcSession { device: row.get("client_device"), metadata: row.get("client_metadata"), notification_endpoint: row.get("client_notification_endpoint"), - connected_at: client_connected_at as u64, + connected_at: client_connected_at, }), None => None, }, - session_open_timestamp: session_open_timestamp as u64, - session_close_timestamp: session_close_timestamp.map(|x| x as u64), + session_open_timestamp: row.get("session_open_timestamp"), + session_close_timestamp: row.get("session_close_timestamp"), }) } } diff --git a/database/src/tables/sessions/update.rs b/database/src/tables/sessions/update.rs index f97ae3a3..6e6c44bd 100644 --- a/database/src/tables/sessions/update.rs +++ b/database/src/tables/sessions/update.rs @@ -1,6 +1,9 @@ use super::table_struct::{DbNcSession, SESSIONS_KEYS, SESSIONS_TABLE_NAME}; -use crate::db::Db; -use sqlx::query; +use crate::{db::Db, tables::utils::get_date_time}; +use sqlx::{ + query, + types::chrono::{DateTime, Utc}, +}; impl Db { pub async fn save_new_session(&self, session: &DbNcSession) -> Result<(), sqlx::Error> { @@ -16,7 +19,7 @@ impl Db { &client.device, &client.metadata, &client.notification_endpoint, - Some(client.connected_at.clone() as i64), + Some(client.connected_at.clone()), ), None => (&None, &None, &None, &None, None), }; @@ -33,8 +36,8 @@ impl Db { .bind(&metadata) .bind(¬ification_endpoint) .bind(&connected_at) - .bind(&(session.session_open_timestamp as i64)) - .bind(&None::) + .bind(&session.session_open_timestamp) + .bind(&None::>) .execute(&self.connection_pool) .await; @@ -54,7 +57,7 @@ impl Db { ); let query_result = query(&query_body) - .bind(close_timestamp as i64) + .bind(get_date_time(close_timestamp)) .bind(session_id) .execute(&self.connection_pool) .await; @@ -72,7 +75,10 @@ mod tests { use super::*; use crate::{ structs::{client_data::ClientData, request_status::RequestStatus}, - tables::{registered_app::table_struct::RegisteredApp, requests::table_struct::Request}, + tables::{ + registered_app::table_struct::RegisteredApp, requests::table_struct::Request, + utils::get_date_time, + }, }; #[tokio::test] @@ -105,9 +111,9 @@ mod tests { device: Some("test_device".to_string()), metadata: Some("test_metadata".to_string()), notification_endpoint: Some("test_notification_endpoint".to_string()), - connected_at: 12, + connected_at: get_date_time(10).unwrap(), }), - session_open_timestamp: 10, + session_open_timestamp: get_date_time(10).unwrap(), session_close_timestamp: None, }; @@ -136,25 +142,27 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(session.session_close_timestamp, Some(15)); + assert_eq!(session.session_close_timestamp, get_date_time(15)); // Create a few requests for the session let request = Request { request_id: "test_request_id".to_string(), request_type: "test_request_type".to_string(), + app_id: "test_app_id".to_string(), session_id: "test_session_id".to_string(), request_status: RequestStatus::Pending, network: "test_network".to_string(), - creation_timestamp: 13, + creation_timestamp: get_date_time(12).unwrap(), }; let second_request = Request { request_id: "test_request_id2".to_string(), request_type: "test_request_type".to_string(), session_id: "test_session_id".to_string(), + app_id: "test_app_id".to_string(), request_status: RequestStatus::Pending, network: "test_network".to_string(), - creation_timestamp: 13, + creation_timestamp: get_date_time(13).unwrap(), }; db.save_request(&request).await.unwrap(); @@ -167,6 +175,7 @@ mod tests { .unwrap(); assert_eq!(requests.len(), 2); - assert_eq!(request, requests[0]); + assert_eq!(request, requests[1]); + assert_eq!(second_request, requests[0]); } } diff --git a/database/src/tables/test_utils.rs b/database/src/tables/test_utils.rs index f4c29757..d35628e7 100644 --- a/database/src/tables/test_utils.rs +++ b/database/src/tables/test_utils.rs @@ -11,6 +11,11 @@ pub mod test_utils { .fetch_all(&self.connection_pool) .await?; + if rows.is_empty() { + println!("No tables to truncate"); + return Ok(()); + } + // Join all names except _sqlx_migrations into a single string and run single truncate let tables_names = rows .iter() @@ -23,7 +28,23 @@ pub mod test_utils { sqlx::query(&query).execute(&self.connection_pool).await?; Ok(()) } - } - + pub async fn refresh_continuous_aggregates(&self) -> Result<(), sqlx::Error> { + // Refresh the hourly_requests_per_app view + let _ = sqlx::query( + "CALL refresh_continuous_aggregate('hourly_requests_per_app', NULL, NULL)", + ) + .execute(&self.connection_pool) + .await?; + + // Refresh the daily_requests_per_app view + let _ = sqlx::query( + "CALL refresh_continuous_aggregate('daily_requests_per_app', NULL, NULL)", + ) + .execute(&self.connection_pool) + .await?; + + Ok(()) + } + } } diff --git a/database/src/tables/utils.rs b/database/src/tables/utils.rs index b22a0b48..49273f8e 100644 --- a/database/src/tables/utils.rs +++ b/database/src/tables/utils.rs @@ -1,3 +1,4 @@ +use sqlx::types::chrono::{DateTime, TimeZone, Utc}; use std::time::{SystemTime, UNIX_EPOCH}; pub fn get_timestamp_in_milliseconds() -> u64 { @@ -5,3 +6,7 @@ pub fn get_timestamp_in_milliseconds() -> u64 { let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); since_the_epoch.as_millis() as u64 } + +pub fn get_date_time(timestamp: u64) -> Option> { + Utc.timestamp_millis_opt(timestamp as i64).single() +} From cf93a575127bd3b66da01c13e399efb3b1e81498 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGiems=E2=80=9D?= <“hubert.wabia@gmail.com”> Date: Wed, 21 Feb 2024 16:19:13 +0100 Subject: [PATCH 4/5] fix test --- .../migrations/000000000007_app_queries.sql | 10 +- database/src/tables/registered_app/select.rs | 48 ++-- database/src/tables/registered_app/update.rs | 220 +++++------------- 3 files changed, 97 insertions(+), 181 deletions(-) diff --git a/database/migrations/000000000007_app_queries.sql b/database/migrations/000000000007_app_queries.sql index 2c5332c8..2dbb0843 100644 --- a/database/migrations/000000000007_app_queries.sql +++ b/database/migrations/000000000007_app_queries.sql @@ -3,10 +3,10 @@ CREATE MATERIALIZED VIEW hourly_requests_per_app WITH (timescaledb.continuous) AS SELECT app_id, - time_bucket('1 h'::interval, creation_timestamp) as bucket, - COUNT(*) AS request_count + time_bucket('1 h'::interval, creation_timestamp) as hourly_bucket, + COUNT(*) AS hourly_request_count FROM requests -GROUP BY app_id, bucket +GROUP BY app_id, hourly_bucket WITH NO DATA; SELECT add_continuous_aggregate_policy('hourly_requests_per_app', @@ -19,8 +19,8 @@ CREATE MATERIALIZED VIEW daily_requests_per_app WITH (timescaledb.continuous) AS SELECT app_id, - time_bucket('1 d'::interval, bucket) AS daily_bucket, - SUM(request_count)::BIGINT AS daily_request_count + time_bucket('1 d'::interval, hourly_bucket) AS daily_bucket, + SUM(hourly_request_count)::BIGINT AS daily_request_count FROM hourly_requests_per_app GROUP BY app_id, daily_bucket WITH NO DATA; diff --git a/database/src/tables/registered_app/select.rs b/database/src/tables/registered_app/select.rs index 0eb56b72..bfe94960 100644 --- a/database/src/tables/registered_app/select.rs +++ b/database/src/tables/registered_app/select.rs @@ -1,7 +1,9 @@ use super::table_struct::{RegisteredApp, REGISTERED_APPS_TABLE_NAME}; +use crate::structs::filter_requests::AggregatedRequestCount; +use crate::structs::time_filters::TimeFilter; use crate::tables::requests::table_struct::REQUESTS_TABLE_NAME; use crate::{db::Db, tables::requests::table_struct::Request}; -use sqlx::query_as; +use sqlx::{query_as, Error}; impl Db { pub async fn get_registered_app_by_app_id( @@ -35,22 +37,42 @@ impl Db { .await; } - pub async fn get_requests_by_app_id_with_filter( + pub async fn get_aggregated_requests_by_app_id( &self, - app_id: &String, - filter: &str, - ) -> Result, sqlx::Error> { + app_id: &str, + filter: TimeFilter, + ) -> Result, Error> { + let start_date = filter.to_date(); + let bucket_size = filter.bucket_size(); + + // Correctly selecting the view based on the bucket_size + let (view_name, bucket, request_count) = match bucket_size { + "1 hour" => ( + "hourly_requests_per_app", + "hourly_bucket", + "hourly_request_count", + ), + "1 day" => ( + "daily_requests_per_app", + "daily_bucket", + "daily_request_count", + ), + // for now return WorkerCrashed but later create custom error + _ => return Err(Error::WorkerCrashed), + }; + let query = format!( - "SELECT r.* FROM {REQUESTS_TABLE_NAME} r - INNER JOIN sessions s ON r.session_id = s.session_id - WHERE s.app_id = $1 AND creation_timestamp >= {filter} - ORDER BY r.creation_timestamp DESC" + "SELECT app_id, {} as bucket, {} as request_count + FROM {} + WHERE app_id = $1 AND {} >= $2 + ORDER BY {} DESC", + bucket, request_count, view_name, bucket, bucket ); - let typed_query = query_as::<_, Request>(&query); - return typed_query - .bind(&app_id) + sqlx::query_as::<_, AggregatedRequestCount>(&query) + .bind(app_id) + .bind(start_date) .fetch_all(&self.connection_pool) - .await; + .await } } diff --git a/database/src/tables/registered_app/update.rs b/database/src/tables/registered_app/update.rs index 99e5e56e..6a51a48f 100644 --- a/database/src/tables/registered_app/update.rs +++ b/database/src/tables/registered_app/update.rs @@ -50,146 +50,16 @@ impl Db { mod tests { use crate::{ structs::{ - consts::{LAST_24_HOURS, LAST_30_DAYS, LAST_7_DAYS}, - request_status::RequestStatus, + consts::DAY_IN_SECONDS, request_status::RequestStatus, time_filters::TimeFilter, }, tables::{ registered_app::table_struct::RegisteredApp, requests::table_struct::Request, - sessions::table_struct::DbNcSession, utils::get_timestamp_in_milliseconds, + sessions::table_struct::DbNcSession, }, }; - - #[tokio::test] - async fn test_register_app() { - let db = super::Db::connect_to_the_pool().await; - db.truncate_all_tables().await.unwrap(); - - let app = RegisteredApp { - app_id: "test_app_id".to_string(), - app_name: "test_app_name".to_string(), - whitelisted_domains: vec!["test_domain".to_string()], - subscription: None, - ack_public_keys: vec!["test_key".to_string()], - email: None, - registration_timestamp: 0, - pass_hash: None, - }; - - db.register_new_app(&app).await.unwrap(); - - let result = db.get_registered_app_by_app_id(&app.app_id).await.unwrap(); - assert_eq!(app, result); - } - - #[tokio::test] - async fn test_get_requests() { - let db = super::Db::connect_to_the_pool().await; - db.truncate_all_tables().await.unwrap(); - - // "Register" an app - let app = RegisteredApp { - app_id: "test_app_id".to_string(), - app_name: "test_app_name".to_string(), - whitelisted_domains: vec!["test_domain".to_string()], - subscription: None, - ack_public_keys: vec!["test_key".to_string()], - email: None, - registration_timestamp: 0, - pass_hash: None, - }; - - db.register_new_app(&app).await.unwrap(); - - let result = db.get_registered_app_by_app_id(&app.app_id).await.unwrap(); - assert_eq!(app, result); - - // Create 2 sessions - let session = DbNcSession { - session_id: "test_session_id".to_string(), - app_id: "test_app_id".to_string(), - app_metadata: "test_app_metadata".to_string(), - app_ip_address: "test_app_ip_address".to_string(), - persistent: false, - network: "test_network".to_string(), - client: None, - session_open_timestamp: 10, - session_close_timestamp: None, - }; - - let second_session = DbNcSession { - session_id: "test_session_id_2".to_string(), - app_id: "test_app_id".to_string(), - app_metadata: "test_app_metadata".to_string(), - app_ip_address: "test_app_ip_address".to_string(), - persistent: false, - network: "test_network".to_string(), - client: None, - session_open_timestamp: 12, - session_close_timestamp: None, - }; - - db.save_new_session(&session).await.unwrap(); - db.save_new_session(&second_session).await.unwrap(); - - let result = db.get_sessions_by_app_id(&app.app_id).await.unwrap(); - assert_eq!(result.len(), 2); - assert_eq!(second_session, result[0]); - assert_eq!(session, result[1]); - - // Create 2 requests per session - // First session - let request = Request { - request_id: "test_request_id".to_string(), - session_id: "test_session_id".to_string(), - network: "test_network".to_string(), - creation_timestamp: 10, - request_status: RequestStatus::Pending, - request_type: "test_request_type".to_string(), - }; - - let second_request = Request { - request_id: "test_request_id_2".to_string(), - session_id: "test_session_id".to_string(), - network: "test_network".to_string(), - creation_timestamp: 12, - request_status: RequestStatus::Pending, - request_type: "test_request_type".to_string(), - }; - - db.save_request(&request).await.unwrap(); - db.save_request(&second_request).await.unwrap(); - - // Second session - let third_request = Request { - request_id: "test_request_id_3".to_string(), - session_id: "test_session_id_2".to_string(), - network: "test_network".to_string(), - creation_timestamp: 14, - request_status: RequestStatus::Pending, - request_type: "test_request_type".to_string(), - }; - - let fourth_request = Request { - request_id: "test_request_id_4".to_string(), - session_id: "test_session_id_2".to_string(), - network: "test_network".to_string(), - creation_timestamp: 16, - request_status: RequestStatus::Pending, - request_type: "test_request_type".to_string(), - }; - - db.save_request(&third_request).await.unwrap(); - db.save_request(&fourth_request).await.unwrap(); - - // Get all requests by app_id - let result = db.get_requests_by_app_id(&app.app_id).await.unwrap(); - assert_eq!(result.len(), 4); - - assert_eq!(result[0], fourth_request); - assert_eq!(result[1], third_request); - assert_eq!(result[2], second_request); - assert_eq!(result[3], request); - } + use sqlx::types::chrono::{DateTime, Utc}; + use std::{sync::Arc, time::Duration}; + use tokio::task; #[tokio::test] async fn test_data_ranges() { @@ -222,7 +92,7 @@ mod tests { persistent: false, network: "test_network".to_string(), client: None, - session_open_timestamp: 10, + session_open_timestamp: DateTime::from(Utc::now()), session_close_timestamp: None, }; @@ -230,45 +100,69 @@ mod tests { let result = db.get_sessions_by_app_id(&app.app_id).await.unwrap(); assert_eq!(result.len(), 1); - assert_eq!(session, result[0]); + // assert_eq!(session, result[0]); + + let db_arc = Arc::new(db); + let mut tasks = Vec::new(); - let now = get_timestamp_in_milliseconds(); - // Create requests across last 33 days, 3 requests per day for i in 0..33 { - for j in 0..3 { - let request = Request { - request_id: format!("test_request_id_{}_{}", i, j), - session_id: "test_session_id".to_string(), - network: "test_network".to_string(), - creation_timestamp: (now - (i * 24 * 60 * 60 * 1000) - ((j + 1) * 10000)) - as u64, - request_status: RequestStatus::Pending, - request_type: "test_request_type".to_string(), - }; + let db_clone = db_arc.clone(); // Clone the db connection or pool if needed + tasks.push(task::spawn(async move { + for j in 0..100 - i { + let creation_time: DateTime = Utc::now() + - Duration::from_secs(i as u64 * DAY_IN_SECONDS as u64) + - Duration::from_millis((j + 1) as u64 * 100); + + let request = Request { + request_id: format!("test_request_id_{}_{}", i, j), + app_id: "test_app_id".to_string(), + session_id: "test_session_id".to_string(), + network: "test_network".to_string(), + creation_timestamp: creation_time, + request_status: RequestStatus::Pending, + request_type: "test_request_type".to_string(), + }; + + if let Err(e) = db_clone.save_request(&request).await { + eprintln!("Failed to save request: {}", e); + } + } + })); + } - db.save_request(&request).await.unwrap(); - } + // Await all tasks to complete + for task in tasks { + task.await.unwrap(); } - // Query last 30 days - let result = db - .get_requests_by_app_id_with_filter(&app.app_id, LAST_30_DAYS) + // We need to refresh manually the views + db_arc.refresh_continuous_aggregates().await.unwrap(); + + let result = db_arc + .get_aggregated_requests_by_app_id(&app.app_id, TimeFilter::Last24Hours) .await .unwrap(); - assert_eq!(result.len(), 30 * 3); - // Query last 7 days - let result = db - .get_requests_by_app_id_with_filter(&app.app_id, LAST_7_DAYS) + assert_eq!(result.len(), 2); + assert_eq!(result[0].request_count, 100); + assert_eq!(result[1].request_count, 99); + + let result = db_arc + .get_aggregated_requests_by_app_id(&app.app_id, TimeFilter::Last7Days) .await .unwrap(); - assert_eq!(result.len(), 7 * 3); - // Query last 24 hours - let result = db - .get_requests_by_app_id_with_filter(&app.app_id, LAST_24_HOURS) + assert_eq!(result.len(), 8); + assert_eq!(result[0].request_count, 100); + assert_eq!(result[7].request_count, 93); + + let result = db_arc + .get_aggregated_requests_by_app_id(&app.app_id, TimeFilter::Last30Days) .await .unwrap(); - assert_eq!(result.len(), 3); + + assert_eq!(result.len(), 31); + assert_eq!(result[0].request_count, 100); + assert_eq!(result[30].request_count, 70); } } From 3df0a77fef3966e898576b15d10c98edbf0f44ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGiems=E2=80=9D?= <“hubert.wabia@gmail.com”> Date: Mon, 26 Feb 2024 13:02:30 +0100 Subject: [PATCH 5/5] update pnpm lock --- sdk/pnpm-lock.yaml | 70 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/sdk/pnpm-lock.yaml b/sdk/pnpm-lock.yaml index 55b2d5b7..2af1fe5a 100644 --- a/sdk/pnpm-lock.yaml +++ b/sdk/pnpm-lock.yaml @@ -142,7 +142,7 @@ importers: version: link:../sui '@nightlylabs/wallet-selector-polkadot': specifier: 0.2.0 - version: link:../../packages/selector-polkadot + version: 0.2.0(@polkadot/util@12.5.1) '@nightlylabs/wallet-selector-solana': specifier: 0.2.7 version: link:../../packages/selector-solana @@ -592,7 +592,7 @@ importers: specifier: 0.0.27 version: link:../../apps/base '@nightlylabs/wallet-selector-modal': - specifier: 0.1.2 + specifier: 0.2.0 version: link:../modal '@wallet-standard/core': specifier: ^1.0.3 @@ -632,7 +632,7 @@ importers: specifier: ^0.0.15 version: link:../../apps/polkadot '@nightlylabs/wallet-selector-base': - specifier: ^0.3.0 + specifier: ^0.3.1 version: link:../selector-base '@polkadot/api': specifier: 10.10.1 @@ -5090,6 +5090,24 @@ packages: - utf-8-validate dev: false + /@nightlylabs/nightly-connect-polkadot@0.0.15: + resolution: {integrity: sha512-WCsumvHwhPipbxPQoswKCwHykwJ48Dffwb9hCf7zjCgEysIBCnA6Dzj/2G80drLqYYpS285nMa8z+3NaXVu2dA==} + dependencies: + '@nightlylabs/nightly-connect-base': 0.0.27 + '@polkadot/api': 10.10.1 + '@polkadot/extension-inject': 0.46.5(@polkadot/api@10.10.1)(@polkadot/util@12.5.1) + '@polkadot/types': 10.10.1 + '@polkadot/util': 12.5.1 + '@polkadot/util-crypto': 12.5.1(@polkadot/util@12.5.1) + eventemitter3: 5.0.1 + uuid: 9.0.0 + transitivePeerDependencies: + - bufferutil + - encoding + - supports-color + - utf-8-validate + dev: false + /@nightlylabs/nightly-connect-solana@0.0.28: resolution: {integrity: sha512-8PBkmuXzWZNPqu6SGT2tsGK4DgD3yswQsUVb3L+GgFGCdQI7eUqyHd2ofWFWzEgj4a1XuixA29ZcSyw20ajgzw==} dependencies: @@ -5140,6 +5158,21 @@ packages: - utf-8-validate dev: false + /@nightlylabs/wallet-selector-base@0.3.1: + resolution: {integrity: sha512-m2hdNkOrQNS52xXYSvko8YvbI60miCU9AHO8HHKfGXiuYgUjAmQSeTm1wLP8LZI4+Mygqbr8Oq9na+HMEgq2XA==} + dependencies: + '@nightlylabs/nightly-connect-base': 0.0.27 + '@nightlylabs/wallet-selector-modal': 0.2.0 + '@wallet-standard/core': 1.0.3 + isomorphic-localstorage: 1.0.2 + transitivePeerDependencies: + - bufferutil + - encoding + - supports-color + - ts-node + - utf-8-validate + dev: false + /@nightlylabs/wallet-selector-modal@0.1.2: resolution: {integrity: sha512-vxy9S2dEf3NARW6LDq2ZKpWMlk5JJFIuwUfSxkuJlgUg2OVSlnDS7vdho3h4DmluRU5GM9vVhaXUGHAVp5sDQg==} dependencies: @@ -5154,6 +5187,37 @@ packages: - ts-node dev: false + /@nightlylabs/wallet-selector-modal@0.2.0: + resolution: {integrity: sha512-BdEk3FhL65z/X0N9ygPjk7uQvV0GGHTWSwXBVob/l48Nok4ikFcV30Dtxk6iSSbErDZ8U4zV3/78cK+m+4lt8A==} + dependencies: + '@nightlylabs/qr-code': 2.0.4 + autoprefixer: 10.4.14(postcss@8.4.24) + lit: 2.7.2 + postcss: 8.4.24 + postcss-lit: 1.1.0(postcss@8.4.24) + tailwindcss: 3.3.2 + transitivePeerDependencies: + - supports-color + - ts-node + dev: false + + /@nightlylabs/wallet-selector-polkadot@0.2.0(@polkadot/util@12.5.1): + resolution: {integrity: sha512-T4C6J+RVBif8H742LdeZfIRZyFZV79CAWL6/w3keQmyi8ZmTfmmxdVaXkSE5Va11j3xt2A298mugbbujluW6bw==} + dependencies: + '@nightlylabs/nightly-connect-polkadot': 0.0.15 + '@nightlylabs/wallet-selector-base': 0.3.1 + '@polkadot/api': 10.10.1 + '@polkadot/extension-inject': 0.46.5(@polkadot/api@10.10.1)(@polkadot/util@12.5.1) + '@wallet-standard/core': 1.0.3 + transitivePeerDependencies: + - '@polkadot/util' + - bufferutil + - encoding + - supports-color + - ts-node + - utf-8-validate + dev: false + /@nightlylabs/wallet-selector-solana@0.2.6(bs58@4.0.1)(react@18.2.0): resolution: {integrity: sha512-cVTKk+c6tGv4GeSQMlUaZ2si4A6ySKj41emkGJ8OtuwmtzwUym4Xuh3chXZYgGrMQgvPrX5+erIR4oq2GmGIPg==} dependencies: