Skip to content

Commit

Permalink
Merge pull request #119 from nightly-labs/update-api-methods
Browse files Browse the repository at this point in the history
Update api methods
  • Loading branch information
Giems authored Mar 7, 2024
2 parents 7854de6 + 8af6d24 commit ffcba1a
Show file tree
Hide file tree
Showing 28 changed files with 270 additions and 160 deletions.
3 changes: 1 addition & 2 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ serde = { workspace = true }
ts-rs = { workspace = true }
tokio = { workspace = true }
dotenvy = { workspace = true }
anyhow = { workspace = true }
axum = { workspace = true }
log = { workspace = true }
9 changes: 4 additions & 5 deletions database/src/aggregated_views_queries/connections_stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{
db::Db,
structs::{filter_requests::ConnectionStats, time_filters::TimeFilter},
structs::{db_error::DbError, filter_requests::ConnectionStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const CONNECTIONS_STATS_BASE_VIEW_NAME: &str = "connection_stats_per_app_and_network";
pub const CONNECTIONS_STATS_BASE_KEYS: [(&'static str, bool); 5] = [
Expand All @@ -20,7 +19,7 @@ impl Db {
app_id: &str,
network: Option<&str>,
filter: TimeFilter,
) -> Result<Vec<ConnectionStats>, Error> {
) -> Result<Vec<ConnectionStats>, DbError> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

Expand All @@ -29,8 +28,7 @@ impl Db {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
_ => return Err(DbError::DatabaseError("Invalid bucket size".to_string())),
};

let formatted_keys = format_view_keys(prefix, &CONNECTIONS_STATS_BASE_KEYS);
Expand All @@ -55,6 +53,7 @@ impl Db {
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
.map_err(|e| e.into())
}
}

Expand Down
9 changes: 4 additions & 5 deletions database/src/aggregated_views_queries/requests_stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{
db::Db,
structs::{filter_requests::RequestsStats, time_filters::TimeFilter},
structs::{db_error::DbError, filter_requests::RequestsStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const REQUESTS_STATS_BASE_VIEW_NAME: &str = "requests_stats_per_app";
pub const REQUESTS_STATS_BASE_KEYS: [(&'static str, bool); 4] = [
Expand All @@ -18,7 +17,7 @@ impl Db {
&self,
app_id: &str,
filter: TimeFilter,
) -> Result<Vec<RequestsStats>, Error> {
) -> Result<Vec<RequestsStats>, DbError> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

Expand All @@ -27,8 +26,7 @@ impl Db {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
_ => return Err(DbError::DatabaseError("Invalid bucket size".to_string())),
};

let formatted_keys = format_view_keys(prefix, &REQUESTS_STATS_BASE_KEYS);
Expand All @@ -48,6 +46,7 @@ impl Db {
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
.map_err(|e| e.into())
}
}

Expand Down
9 changes: 4 additions & 5 deletions database/src/aggregated_views_queries/sessions_stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{
db::Db,
structs::{filter_requests::SessionsStats, time_filters::TimeFilter},
structs::{db_error::DbError, filter_requests::SessionsStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const SESSIONS_STATS_BASE_VIEW_NAME: &str = "sessions_stats_per_app";
pub const SESSIONS_STATS_BASE_KEYS: [(&'static str, bool); 4] = [
Expand All @@ -18,7 +17,7 @@ impl Db {
&self,
app_id: &str,
filter: TimeFilter,
) -> Result<Vec<SessionsStats>, Error> {
) -> Result<Vec<SessionsStats>, DbError> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

Expand All @@ -27,8 +26,7 @@ impl Db {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
_ => return Err(DbError::DatabaseError("Invalid bucket size".to_string())),
};

let formatted_keys = format_view_keys(prefix, &SESSIONS_STATS_BASE_KEYS);
Expand All @@ -48,6 +46,7 @@ impl Db {
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
.map_err(|e| e.into())
}
}

Expand Down
28 changes: 28 additions & 0 deletions database/src/structs/db_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::fmt;

#[derive(Debug)]
pub enum DbError {
DatabaseError(String),
SqlxDbError(sqlx::Error),
}

impl fmt::Display for DbError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DbError::DatabaseError(ref err) => write!(f, "Database error: {}", err),
DbError::SqlxDbError(ref err) => write!(f, "Sqlx database error: {}", err),
}
}
}

impl From<sqlx::Error> for DbError {
fn from(error: sqlx::Error) -> DbError {
DbError::SqlxDbError(error)
}
}

impl From<String> for DbError {
fn from(error: String) -> DbError {
DbError::DatabaseError(error)
}
}
1 change: 1 addition & 0 deletions database/src/structs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod client_data;
pub mod consts;
pub mod db_error;
pub mod entity_type;
pub mod filter_requests;
pub mod privilege_level;
Expand Down
6 changes: 4 additions & 2 deletions database/src/tables/client_profiles/select.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use super::table_struct::ClientProfile;
use crate::db::Db;
use crate::structs::db_error::DbError;
use crate::tables::client_profiles::table_struct::CLIENT_PROFILES_TABLE_NAME;
use sqlx::query_as;

impl Db {
pub async fn get_profile_by_profile_id(
&self,
client_profile_id: i64,
) -> Result<ClientProfile, sqlx::Error> {
) -> Result<ClientProfile, DbError> {
let query =
format!("SELECT * FROM {CLIENT_PROFILES_TABLE_NAME} WHERE client_profile_id = $1");
let typed_query = query_as::<_, ClientProfile>(&query);

return typed_query
.bind(&client_profile_id)
.fetch_one(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}
}
13 changes: 9 additions & 4 deletions database/src/tables/client_profiles/update.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use super::table_struct::{CLIENT_PROFILES_KEYS, CLIENT_PROFILES_TABLE_NAME};
use crate::{db::Db, tables::client_profiles::table_struct::ClientProfile};
use crate::{
db::Db, structs::db_error::DbError, tables::client_profiles::table_struct::ClientProfile,
};
use sqlx::{query_as, Transaction};

impl Db {
pub async fn create_new_profile(
&self,
tx: Option<&mut Transaction<'_, sqlx::Postgres>>,
) -> Result<ClientProfile, sqlx::Error> {
) -> Result<ClientProfile, DbError> {
let query_body = format!(
"INSERT INTO {CLIENT_PROFILES_TABLE_NAME} ({CLIENT_PROFILES_KEYS}) VALUES (DEFAULT, DEFAULT) RETURNING {CLIENT_PROFILES_KEYS}"
);
let typed_query = query_as::<_, ClientProfile>(&query_body);

return match tx {
Some(tx) => typed_query.fetch_one(&mut **tx).await,
None => typed_query.fetch_one(&self.connection_pool).await,
Some(tx) => typed_query.fetch_one(&mut **tx).await.map_err(|e| e.into()),
None => typed_query
.fetch_one(&self.connection_pool)
.await
.map_err(|e| e.into()),
};
}
}
Expand Down
26 changes: 16 additions & 10 deletions database/src/tables/connection_events/select.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::table_struct::ConnectionEvent;
use crate::db::Db;
use crate::structs::db_error::DbError;
use crate::structs::entity_type::EntityType;
use crate::structs::filter_requests::DistinctConnectedClient;
use crate::tables::connection_events::table_struct::CONNECTION_EVENTS_TABLE_NAME;
Expand All @@ -9,20 +10,21 @@ impl Db {
pub async fn get_connection_events_by_session_id(
&self,
session_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE session_id = $1");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&session_id)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_connection_events_by_client_profile_id(
&self,
client_profile_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!(
"SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE entity_id = $1 AND entity_type = $2"
);
Expand All @@ -32,40 +34,43 @@ impl Db {
.bind(&client_profile_id)
.bind(&EntityType::Client)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_connection_events_by_app_id(
&self,
app_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE app_id = $1");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&app_id)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_connection_events_by_app(
&self,
app_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE entity_id = $1 AND entity_type = $2");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&app_id)
.bind(&EntityType::App)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_all_app_distinct_users(
&self,
app_id: &String,
) -> Result<Vec<DistinctConnectedClient>, sqlx::Error> {
) -> Result<Vec<DistinctConnectedClient>, DbError> {
let query = format!(
"SELECT pk.public_key, MIN(ce.connected_at) AS first_connection, MAX(ce.connected_at) AS last_connection
FROM {CONNECTION_EVENTS_TABLE_NAME} ce
Expand All @@ -78,6 +83,7 @@ impl Db {
.bind(app_id)
.bind(EntityType::Client)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}
}
17 changes: 9 additions & 8 deletions database/src/tables/connection_events/update.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::Db;
use crate::structs::db_error::DbError;
use crate::structs::entity_type::EntityType;
use crate::tables::connection_events::table_struct::{
CONNECTION_EVENTS_KEYS_KEYS, CONNECTION_EVENTS_TABLE_NAME,
Expand All @@ -14,7 +15,7 @@ impl Db {
connection_id: &String,
app_id: &String,
network: &String,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"INSERT INTO {CONNECTION_EVENTS_TABLE_NAME} ({CONNECTION_EVENTS_KEYS_KEYS}) VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, NOW(), NULL)"
);
Expand All @@ -31,7 +32,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}

Expand All @@ -42,7 +43,7 @@ impl Db {
session_id: &String,
client_profile_id: i64,
network: &String,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"INSERT INTO {CONNECTION_EVENTS_TABLE_NAME} ({CONNECTION_EVENTS_KEYS_KEYS}) VALUES (DEFAULT, $1, $2, NULL, $3, $4, $5, NOW(), NULL)"
);
Expand All @@ -58,7 +59,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}

Expand All @@ -68,7 +69,7 @@ impl Db {
app_id: &String,
connection_id: &String,
session_id: &String,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"UPDATE {CONNECTION_EVENTS_TABLE_NAME}
SET disconnected_at = NOW()
Expand All @@ -85,7 +86,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}

Expand All @@ -95,7 +96,7 @@ impl Db {
app_id: &String,
session_id: &String,
client_profile_id: i64,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"UPDATE {CONNECTION_EVENTS_TABLE_NAME}
SET disconnected_at = NOW()
Expand All @@ -112,7 +113,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}
}
Expand Down
Loading

0 comments on commit ffcba1a

Please sign in to comment.