Skip to content

Commit

Permalink
Merge pull request #99 from nightly-labs/queries-improvements
Browse files Browse the repository at this point in the history
Replace Postgress with Timescale
  • Loading branch information
Giems authored Feb 26, 2024
2 parents ac1bb7b + 3df0a77 commit 5098101
Show file tree
Hide file tree
Showing 20 changed files with 349 additions and 250 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ reqwest = "0.11.24"
tokio = { version = "1.35.1", features = ["full"] }
async-trait = "0.1.77"

sqlx = { version = "0.7.3", features = [ "runtime-tokio", "tls-rustls", "macros", "postgres", "time"] }
sqlx = { version = "0.7.3", features = [ "runtime-tokio", "tls-rustls", "macros", "postgres", "chrono"] }
18 changes: 5 additions & 13 deletions database/migrations/000000000003_sessions.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE TABLE sessions(
session_id TEXT NOT NULL UNIQUE,
app_id TEXT NOT NULL,
session_id TEXT NOT NULL,
app_id TEXT NOT NULL REFERENCES registered_apps(app_id),
app_metadata TEXT NOT NULL,
app_ip_address TEXT NOT NULL,
persistent BOOLEAN NOT NULL,
Expand All @@ -9,15 +9,7 @@ CREATE TABLE sessions(
client_device TEXT,
client_metadata TEXT,
client_notification_endpoint TEXT,
client_connected_at BIGINT,
session_open_timestamp BIGINT NOT NULL,
session_close_timestamp BIGINT
client_connected_at TIMESTAMPTZ,
session_open_timestamp TIMESTAMPTZ NOT NULL,
session_close_timestamp TIMESTAMPTZ
);

CREATE UNIQUE INDEX sessions_session_id ON sessions(session_id);

ALTER TABLE sessions
ADD CONSTRAINT fk_sessions_registered_apps
FOREIGN KEY (app_id)
REFERENCES registered_apps (app_id)
ON DELETE CASCADE;
15 changes: 4 additions & 11 deletions database/migrations/000000000005_requests.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
CREATE TABLE requests(
request_id TEXT NOT NULL UNIQUE,
request_type TEXT NOT NULL,
request_id TEXT NOT NULL,
session_id TEXT NOT NULL,
app_id TEXT NOT NULL,
request_type TEXT NOT NULL,
request_status request_status_enum NOT NULL,
network TEXT NOT NULL,
creation_timestamp BIGINT NOT NULL
creation_timestamp TIMESTAMPTZ NOT NULL
);

CREATE UNIQUE INDEX requests_request_id ON requests(request_id);

ALTER TABLE requests
ADD CONSTRAINT fk_requests_sessions
FOREIGN KEY (session_id)
REFERENCES sessions (session_id)
ON DELETE CASCADE;
2 changes: 2 additions & 0 deletions database/migrations/000000000006_create_hypertables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SELECT create_hypertable('sessions', 'session_open_timestamp');
SELECT create_hypertable('requests', 'creation_timestamp');
49 changes: 49 additions & 0 deletions database/migrations/000000000007_app_queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
----------------- Hourly requests per app -----------------
CREATE MATERIALIZED VIEW hourly_requests_per_app
WITH (timescaledb.continuous)
AS SELECT
app_id,
time_bucket('1 h'::interval, creation_timestamp) as hourly_bucket,
COUNT(*) AS hourly_request_count
FROM requests
GROUP BY app_id, hourly_bucket
WITH NO DATA;

SELECT add_continuous_aggregate_policy('hourly_requests_per_app',
start_offset => INTERVAL '3 h',
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '1 h');

----------------- Daily requests per app -----------------
CREATE MATERIALIZED VIEW daily_requests_per_app
WITH (timescaledb.continuous) AS
SELECT
app_id,
time_bucket('1 d'::interval, hourly_bucket) AS daily_bucket,
SUM(hourly_request_count)::BIGINT AS daily_request_count
FROM hourly_requests_per_app
GROUP BY app_id, daily_bucket
WITH NO DATA;

SELECT add_continuous_aggregate_policy('daily_requests_per_app',
start_offset => INTERVAL '3 d',
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '12 h');

----------------- Monthly requests per app -----------------
CREATE MATERIALIZED VIEW monthly_requests_per_app
WITH (timescaledb.continuous) AS
SELECT
app_id,
time_bucket('1 month'::interval, daily_bucket) AS monthly_bucket,
SUM(daily_request_count)::BIGINT AS monthly_request_count
FROM daily_requests_per_app
GROUP BY app_id, monthly_bucket
WITH NO DATA;

SELECT add_continuous_aggregate_policy('monthly_requests_per_app',
start_offset => INTERVAL '3 month',
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '1 month');


4 changes: 3 additions & 1 deletion database/src/structs/client_data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use sqlx::types::chrono::{DateTime, Utc};

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ClientData {
pub client_id: Option<String>,
pub device: Option<String>,
pub metadata: Option<String>,
pub notification_endpoint: Option<String>,
pub connected_at: u64, // Timestamp of when the client connected to the session
pub connected_at: DateTime<Utc>, // Timestamp of when the client connected to the session
}
4 changes: 1 addition & 3 deletions database/src/structs/consts.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
pub const LAST_24_HOURS: &str = "EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')::BIGINT * 1000";
pub const LAST_7_DAYS: &str = "EXTRACT(EPOCH FROM NOW() - INTERVAL '7 days')::BIGINT * 1000";
pub const LAST_30_DAYS: &str = "EXTRACT(EPOCH FROM NOW() - INTERVAL '30 days')::BIGINT * 1000";
pub const DAY_IN_SECONDS: u64 = 60 * 60 * 24; // 86400
8 changes: 8 additions & 0 deletions database/src/structs/filter_requests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use sqlx::types::chrono::{DateTime, Utc};

#[derive(Debug, sqlx::FromRow)]
pub struct AggregatedRequestCount {
pub app_id: String,
pub bucket: DateTime<Utc>,
pub request_count: i64,
}
2 changes: 2 additions & 0 deletions database/src/structs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod client_data;
pub mod consts;
pub mod filter_requests;
pub mod request_status;
pub mod subscription;
pub mod time_filters;
29 changes: 29 additions & 0 deletions database/src/structs/time_filters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::consts::DAY_IN_SECONDS;
use sqlx::types::chrono::{NaiveDate, Utc};
use std::time::Duration;

pub enum TimeFilter {
Last24Hours,
Last7Days,
Last30Days,
}

impl TimeFilter {
pub fn to_date(&self) -> NaiveDate {
let duration = match self {
TimeFilter::Last24Hours => Duration::from_secs(DAY_IN_SECONDS),
TimeFilter::Last7Days => Duration::from_secs(7 * DAY_IN_SECONDS),
TimeFilter::Last30Days => Duration::from_secs(30 * DAY_IN_SECONDS),
};
// Subtract the duration from the current time and convert to NaiveDate
(Utc::now() - duration).date_naive()
}

pub fn bucket_size(&self) -> &'static str {
match self {
TimeFilter::Last24Hours => "1 hour",
TimeFilter::Last7Days => "1 day",
TimeFilter::Last30Days => "1 day",
}
}
}
48 changes: 35 additions & 13 deletions database/src/tables/registered_app/select.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::table_struct::{RegisteredApp, REGISTERED_APPS_TABLE_NAME};
use crate::structs::filter_requests::AggregatedRequestCount;
use crate::structs::time_filters::TimeFilter;
use crate::tables::requests::table_struct::REQUESTS_TABLE_NAME;
use crate::{db::Db, tables::requests::table_struct::Request};
use sqlx::query_as;
use sqlx::{query_as, Error};

impl Db {
pub async fn get_registered_app_by_app_id(
Expand Down Expand Up @@ -35,22 +37,42 @@ impl Db {
.await;
}

pub async fn get_requests_by_app_id_with_filter(
pub async fn get_aggregated_requests_by_app_id(
&self,
app_id: &String,
filter: &str,
) -> Result<Vec<Request>, sqlx::Error> {
app_id: &str,
filter: TimeFilter,
) -> Result<Vec<AggregatedRequestCount>, Error> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

// Correctly selecting the view based on the bucket_size
let (view_name, bucket, request_count) = match bucket_size {
"1 hour" => (
"hourly_requests_per_app",
"hourly_bucket",
"hourly_request_count",
),
"1 day" => (
"daily_requests_per_app",
"daily_bucket",
"daily_request_count",
),
// for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
};

let query = format!(
"SELECT r.* FROM {REQUESTS_TABLE_NAME} r
INNER JOIN sessions s ON r.session_id = s.session_id
WHERE s.app_id = $1 AND creation_timestamp >= {filter}
ORDER BY r.creation_timestamp DESC"
"SELECT app_id, {} as bucket, {} as request_count
FROM {}
WHERE app_id = $1 AND {} >= $2
ORDER BY {} DESC",
bucket, request_count, view_name, bucket, bucket
);
let typed_query = query_as::<_, Request>(&query);

return typed_query
.bind(&app_id)
sqlx::query_as::<_, AggregatedRequestCount>(&query)
.bind(app_id)
.bind(start_date)
.fetch_all(&self.connection_pool)
.await;
.await
}
}
Loading

0 comments on commit 5098101

Please sign in to comment.