Skip to content

Commit

Permalink
Merge pull request #138 from nightly-labs/rework-existing-handlers
Browse files Browse the repository at this point in the history
update existing event handlers
  • Loading branch information
Giems authored Mar 21, 2024
2 parents 1993203 + 3aa82de commit 4e3e3d3
Show file tree
Hide file tree
Showing 17 changed files with 443 additions and 92 deletions.
8 changes: 4 additions & 4 deletions database/src/tables/events/client_connect/update.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
db::Db,
structs::db_error::DbError,
structs::{db_error::DbError, session_type::SessionType},
tables::events::client_connect::table_struct::{
EVENT_CLIENT_CONNECT_KEYS, EVENT_CLIENT_CONNECT_TABLE_NAME,
},
Expand All @@ -16,7 +16,7 @@ impl Db {
session_id: &String,
wallet_name: &String,
wallet_type: &String,
session_type: &String,
session_type: &SessionType,
) -> Result<(), DbError> {
let query_body = format!(
"INSERT INTO {EVENT_CLIENT_CONNECT_TABLE_NAME} ({EVENT_CLIENT_CONNECT_KEYS}) VALUES ($1, $2, $3, NULL, $4, $5, $6, false)"
Expand All @@ -38,13 +38,13 @@ impl Db {
}
}

pub async fn update_event_client_connect_success(
pub async fn update_event_client_connect(
&self,
tx: &mut Transaction<'_, Postgres>,
client_id: &String,
session_id: &String,
success: bool,
new_addresses: Vec<String>,
new_addresses: &Vec<String>,
) -> Result<(), DbError> {
let query_body = format!(
"UPDATE {EVENT_CLIENT_CONNECT_TABLE_NAME} SET success = $1, addresses = $2 WHERE client_id = $3 AND session_id = $4 AND success = false"
Expand Down
11 changes: 5 additions & 6 deletions database/src/tables/events/events_index/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,27 @@ use crate::{
structs::{db_error::DbError, event_type::EventType},
tables::events::events_index::table_struct::{EVENTS_KEYS, EVENTS_TABLE_NAME},
};
use sqlx::Transaction;
use sqlx::{query, Postgres};
use sqlx::{query, Postgres, Row, Transaction};

impl Db {
pub async fn create_new_event_entry(
&self,
tx: &mut Transaction<'_, Postgres>,
app_id: &String,
event_type: &EventType,
) -> Result<(), DbError> {
) -> Result<i64, DbError> {
let query_body = format!(
"INSERT INTO {EVENTS_TABLE_NAME} ({EVENTS_KEYS}) VALUES (DEFAULT, $1, $2, DEFAULT)"
"INSERT INTO {EVENTS_TABLE_NAME} ({EVENTS_KEYS}) VALUES (DEFAULT, $1, $2, DEFAULT) RETURNING event_id"
);

let query_result = query(&query_body)
.bind(app_id)
.bind(event_type)
.execute(&mut **tx)
.fetch_one(&mut **tx)
.await;

match query_result {
Ok(_) => Ok(()),
Ok(row) => Ok(row.get("event_id")),
Err(e) => Err(e).map_err(|e| e.into()),
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/src/http/cloud/events/events_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
processors::{
process_event_app_connect::process_event_app_connect,
process_event_app_disconnect::process_event_app_disconnect,
process_event_client_connect_init::process_event_client_connect_init,
process_event_client_connect::process_event_client_connect_init,
process_event_client_connect_resolve::process_event_client_connect_resolve,
process_event_client_disconnect::process_event_client_disconnect,
},
Expand Down Expand Up @@ -37,7 +37,7 @@ pub async fn process_event(
EventData::AppDisconnect(event) => {
process_event_app_disconnect(event, &event_payload.app_id, ip, db_connection).await;
}
EventData::ClientConnectInit(event) => {
EventData::ClientConnect(event) => {
process_event_client_connect_init(
event,
&event_payload.app_id,
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/cloud/events/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod process_event_app_connect;
pub mod process_event_app_disconnect;
pub mod process_event_client_connect_init;
pub mod process_event_client_connect;
pub mod process_event_client_connect_resolve;
pub mod process_event_client_disconnect;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
};
use database::{
db::Db,
structs::event_type::EventType,
tables::{
sessions::table_struct::DbNcSession,
utils::{get_current_datetime, get_date_time},
Expand All @@ -21,6 +22,9 @@ pub async fn process_event_app_connect(
geo_loc_requester: &Arc<GeolocationRequester>,
sessions: &Sessions,
) {
// Save event to Db
save_event_app_connect(db, app_id, event).await;

if event.new_session {
// New session, get the data from sessions and create a new session in the database
let session_data = {
Expand Down Expand Up @@ -98,17 +102,88 @@ pub async fn process_event_app_connect(
// Commit the transaction
if let Err(err) = tx.commit().await {
error!(
"Failed to commit transaction for new app connection event, app_id: [{}], ip: [{}], event: [{:?}], err: [{}]",
"Failed to commit transaction for new app connection, app_id: [{}], ip: [{}], event: [{:?}], err: [{}]",
app_id, ip, event, err
);
}
}
Err(err) => {
error!(
"Failed to create new connection event, app_id: [{}], ip: [{}], event: [{:?}], err: [{}]",
"Failed to create new connection, app_id: [{}], ip: [{}], event: [{:?}], err: [{}]",
app_id, ip, event, err
);
}
}
}
}

async fn save_event_app_connect(db: &Arc<Db>, app_id: &String, event: &AppConnectEvent) {
// Establish a new transaction
match db.connection_pool.begin().await {
Ok(mut tx) => {
// Create a new event index in the database
match db
.create_new_event_entry(&mut tx, &app_id, &EventType::AppConnect)
.await
{
Ok(event_id) => {
// Now create a new event app connect corresponding to the event
match db
.create_new_event_app_connect(
&mut tx,
event_id,
&event.session_id,
&event
.device_metadata
.to_string()
.unwrap_or("Failed to serialize device metadata".to_string()),
&event.language,
&event.timezone,
event.new_session,
)
.await
{
Ok(_) => {
// Commit the transaction
if let Err(err) = tx.commit().await {
error!(
"Failed to commit transaction for new app connection event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}

return;
}
Err(err) => {
error!(
"Failed to create new app connection event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}
}
Err(err) => {
error!(
"Failed to create new event index, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}

// If we have not returned yet, then we have failed to save the event
// Rollback the transaction
if let Err(err) = tx.rollback().await {
error!(
"Failed to rollback transaction for new app connection event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}
Err(err) => {
error!(
"Failed to create new transaction to save app connection event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::structs::cloud::cloud_events::event_types::app_disconnect_event::AppDisconnectEvent;
use database::db::Db;
use database::{db::Db, structs::event_type::EventType};
use log::error;
use std::{net::SocketAddr, sync::Arc};

Expand All @@ -9,6 +9,9 @@ pub async fn process_event_app_disconnect(
ip: SocketAddr,
db: &Arc<Db>,
) {
// Save event to Db
save_event_app_disconnect(db, app_id, event).await;

// Close app connection in the database
if let Err(err) = db.close_app_connection(&event.session_id, &app_id).await {
error!(
Expand All @@ -17,3 +20,63 @@ pub async fn process_event_app_disconnect(
);
}
}

async fn save_event_app_disconnect(db: &Arc<Db>, app_id: &String, event: &AppDisconnectEvent) {
// Establish a new transaction
match db.connection_pool.begin().await {
Ok(mut tx) => {
// Create a new event index in the database
match db
.create_new_event_entry(&mut tx, &app_id, &EventType::AppDisconnect)
.await
{
Ok(event_id) => {
// Now create a new event app disconnect corresponding to the event
match db
.create_new_event_app_disconnect(&mut tx, event_id, &event.session_id)
.await
{
Ok(_) => {
// Commit the transaction
if let Err(err) = tx.commit().await {
error!(
"Failed to commit transaction for new app disconnect event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}

return;
}
Err(err) => {
error!(
"Failed to create new app disconnect event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}
}
Err(err) => {
error!(
"Failed to create new event index, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}

// If we have not returned yet, then we have failed to save the event
// Rollback the transaction
if let Err(err) = tx.rollback().await {
error!(
"Failed to rollback transaction for new app disconnect event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}
Err(err) => {
error!(
"Failed to create new transaction to save app disconnect event, app_id: [{}], event: [{:?}], err: [{}]",
app_id, event, err
);
}
}
}
Loading

0 comments on commit 4e3e3d3

Please sign in to comment.