Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update api methods #119

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ serde = { workspace = true }
ts-rs = { workspace = true }
tokio = { workspace = true }
dotenvy = { workspace = true }
anyhow = { workspace = true }
axum = { workspace = true }
log = { workspace = true }
9 changes: 4 additions & 5 deletions database/src/aggregated_views_queries/connections_stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{
db::Db,
structs::{filter_requests::ConnectionStats, time_filters::TimeFilter},
structs::{db_error::DbError, filter_requests::ConnectionStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const CONNECTIONS_STATS_BASE_VIEW_NAME: &str = "connection_stats_per_app_and_network";
pub const CONNECTIONS_STATS_BASE_KEYS: [(&'static str, bool); 5] = [
Expand All @@ -20,7 +19,7 @@ impl Db {
app_id: &str,
network: Option<&str>,
filter: TimeFilter,
) -> Result<Vec<ConnectionStats>, Error> {
) -> Result<Vec<ConnectionStats>, DbError> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

Expand All @@ -29,8 +28,7 @@ impl Db {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
_ => return Err(DbError::DatabaseError("Invalid bucket size".to_string())),
};

let formatted_keys = format_view_keys(prefix, &CONNECTIONS_STATS_BASE_KEYS);
Expand All @@ -55,6 +53,7 @@ impl Db {
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
.map_err(|e| e.into())
}
}

Expand Down
9 changes: 4 additions & 5 deletions database/src/aggregated_views_queries/requests_stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{
db::Db,
structs::{filter_requests::RequestsStats, time_filters::TimeFilter},
structs::{db_error::DbError, filter_requests::RequestsStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const REQUESTS_STATS_BASE_VIEW_NAME: &str = "requests_stats_per_app";
pub const REQUESTS_STATS_BASE_KEYS: [(&'static str, bool); 4] = [
Expand All @@ -18,7 +17,7 @@ impl Db {
&self,
app_id: &str,
filter: TimeFilter,
) -> Result<Vec<RequestsStats>, Error> {
) -> Result<Vec<RequestsStats>, DbError> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

Expand All @@ -27,8 +26,7 @@ impl Db {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
_ => return Err(DbError::DatabaseError("Invalid bucket size".to_string())),
};

let formatted_keys = format_view_keys(prefix, &REQUESTS_STATS_BASE_KEYS);
Expand All @@ -48,6 +46,7 @@ impl Db {
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
.map_err(|e| e.into())
}
}

Expand Down
9 changes: 4 additions & 5 deletions database/src/aggregated_views_queries/sessions_stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{
db::Db,
structs::{filter_requests::SessionsStats, time_filters::TimeFilter},
structs::{db_error::DbError, filter_requests::SessionsStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const SESSIONS_STATS_BASE_VIEW_NAME: &str = "sessions_stats_per_app";
pub const SESSIONS_STATS_BASE_KEYS: [(&'static str, bool); 4] = [
Expand All @@ -18,7 +17,7 @@ impl Db {
&self,
app_id: &str,
filter: TimeFilter,
) -> Result<Vec<SessionsStats>, Error> {
) -> Result<Vec<SessionsStats>, DbError> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

Expand All @@ -27,8 +26,7 @@ impl Db {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
_ => return Err(DbError::DatabaseError("Invalid bucket size".to_string())),
};

let formatted_keys = format_view_keys(prefix, &SESSIONS_STATS_BASE_KEYS);
Expand All @@ -48,6 +46,7 @@ impl Db {
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
.map_err(|e| e.into())
}
}

Expand Down
28 changes: 28 additions & 0 deletions database/src/structs/db_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::fmt;

#[derive(Debug)]
pub enum DbError {
DatabaseError(String),
SqlxDbError(sqlx::Error),
}

impl fmt::Display for DbError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DbError::DatabaseError(ref err) => write!(f, "Database error: {}", err),
DbError::SqlxDbError(ref err) => write!(f, "Sqlx database error: {}", err),
}
}
}

impl From<sqlx::Error> for DbError {
fn from(error: sqlx::Error) -> DbError {
DbError::SqlxDbError(error)
}
}

impl From<String> for DbError {
fn from(error: String) -> DbError {
DbError::DatabaseError(error)
}
}
1 change: 1 addition & 0 deletions database/src/structs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod client_data;
pub mod consts;
pub mod db_error;
pub mod entity_type;
pub mod filter_requests;
pub mod privilege_level;
Expand Down
6 changes: 4 additions & 2 deletions database/src/tables/client_profiles/select.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use super::table_struct::ClientProfile;
use crate::db::Db;
use crate::structs::db_error::DbError;
use crate::tables::client_profiles::table_struct::CLIENT_PROFILES_TABLE_NAME;
use sqlx::query_as;

impl Db {
pub async fn get_profile_by_profile_id(
&self,
client_profile_id: i64,
) -> Result<ClientProfile, sqlx::Error> {
) -> Result<ClientProfile, DbError> {
let query =
format!("SELECT * FROM {CLIENT_PROFILES_TABLE_NAME} WHERE client_profile_id = $1");
let typed_query = query_as::<_, ClientProfile>(&query);

return typed_query
.bind(&client_profile_id)
.fetch_one(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}
}
13 changes: 9 additions & 4 deletions database/src/tables/client_profiles/update.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use super::table_struct::{CLIENT_PROFILES_KEYS, CLIENT_PROFILES_TABLE_NAME};
use crate::{db::Db, tables::client_profiles::table_struct::ClientProfile};
use crate::{
db::Db, structs::db_error::DbError, tables::client_profiles::table_struct::ClientProfile,
};
use sqlx::{query_as, Transaction};

impl Db {
pub async fn create_new_profile(
&self,
tx: Option<&mut Transaction<'_, sqlx::Postgres>>,
) -> Result<ClientProfile, sqlx::Error> {
) -> Result<ClientProfile, DbError> {
let query_body = format!(
"INSERT INTO {CLIENT_PROFILES_TABLE_NAME} ({CLIENT_PROFILES_KEYS}) VALUES (DEFAULT, DEFAULT) RETURNING {CLIENT_PROFILES_KEYS}"
);
let typed_query = query_as::<_, ClientProfile>(&query_body);

return match tx {
Some(tx) => typed_query.fetch_one(&mut **tx).await,
None => typed_query.fetch_one(&self.connection_pool).await,
Some(tx) => typed_query.fetch_one(&mut **tx).await.map_err(|e| e.into()),
None => typed_query
.fetch_one(&self.connection_pool)
.await
.map_err(|e| e.into()),
};
}
}
Expand Down
26 changes: 16 additions & 10 deletions database/src/tables/connection_events/select.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::table_struct::ConnectionEvent;
use crate::db::Db;
use crate::structs::db_error::DbError;
use crate::structs::entity_type::EntityType;
use crate::structs::filter_requests::DistinctConnectedClient;
use crate::tables::connection_events::table_struct::CONNECTION_EVENTS_TABLE_NAME;
Expand All @@ -9,20 +10,21 @@ impl Db {
pub async fn get_connection_events_by_session_id(
&self,
session_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE session_id = $1");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&session_id)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_connection_events_by_client_profile_id(
&self,
client_profile_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!(
"SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE entity_id = $1 AND entity_type = $2"
);
Expand All @@ -32,40 +34,43 @@ impl Db {
.bind(&client_profile_id)
.bind(&EntityType::Client)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_connection_events_by_app_id(
&self,
app_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE app_id = $1");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&app_id)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_connection_events_by_app(
&self,
app_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
) -> Result<Vec<ConnectionEvent>, DbError> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE entity_id = $1 AND entity_type = $2");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&app_id)
.bind(&EntityType::App)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}

pub async fn get_all_app_distinct_users(
&self,
app_id: &String,
) -> Result<Vec<DistinctConnectedClient>, sqlx::Error> {
) -> Result<Vec<DistinctConnectedClient>, DbError> {
let query = format!(
"SELECT pk.public_key, MIN(ce.connected_at) AS first_connection, MAX(ce.connected_at) AS last_connection
FROM {CONNECTION_EVENTS_TABLE_NAME} ce
Expand All @@ -78,6 +83,7 @@ impl Db {
.bind(app_id)
.bind(EntityType::Client)
.fetch_all(&self.connection_pool)
.await;
.await
.map_err(|e| e.into());
}
}
17 changes: 9 additions & 8 deletions database/src/tables/connection_events/update.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db::Db;
use crate::structs::db_error::DbError;
use crate::structs::entity_type::EntityType;
use crate::tables::connection_events::table_struct::{
CONNECTION_EVENTS_KEYS_KEYS, CONNECTION_EVENTS_TABLE_NAME,
Expand All @@ -14,7 +15,7 @@ impl Db {
connection_id: &String,
app_id: &String,
network: &String,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"INSERT INTO {CONNECTION_EVENTS_TABLE_NAME} ({CONNECTION_EVENTS_KEYS_KEYS}) VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, NOW(), NULL)"
);
Expand All @@ -31,7 +32,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}

Expand All @@ -42,7 +43,7 @@ impl Db {
session_id: &String,
client_profile_id: i64,
network: &String,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"INSERT INTO {CONNECTION_EVENTS_TABLE_NAME} ({CONNECTION_EVENTS_KEYS_KEYS}) VALUES (DEFAULT, $1, $2, NULL, $3, $4, $5, NOW(), NULL)"
);
Expand All @@ -58,7 +59,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}

Expand All @@ -68,7 +69,7 @@ impl Db {
app_id: &String,
connection_id: &String,
session_id: &String,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"UPDATE {CONNECTION_EVENTS_TABLE_NAME}
SET disconnected_at = NOW()
Expand All @@ -85,7 +86,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}

Expand All @@ -95,7 +96,7 @@ impl Db {
app_id: &String,
session_id: &String,
client_profile_id: i64,
) -> Result<(), sqlx::Error> {
) -> Result<(), DbError> {
let query_body = format!(
"UPDATE {CONNECTION_EVENTS_TABLE_NAME}
SET disconnected_at = NOW()
Expand All @@ -112,7 +113,7 @@ impl Db {

match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
Err(e) => Err(e).map_err(|e| e.into()),
}
}
}
Expand Down
Loading
Loading