From 5ebde56f5519f6303a5393e976f08bee4706639e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGiems=E2=80=9D?= <“hubert.wabia@gmail.com”> Date: Tue, 6 Feb 2024 15:37:17 +0100 Subject: [PATCH] refactor client ws handler --- server/src/client/client_handler.rs | 437 ------------------ server/src/client/mod.rs | 1 - server/src/router.rs | 14 +- server/src/structs/session.rs | 28 +- server/src/ws/app_handler/handler.rs | 2 +- server/src/ws/client_handler/handler.rs | 259 +++++++++++ .../methods/disconnect_sessions.rs | 34 ++ .../src/ws/client_handler/methods/get_info.rs | 53 +++ .../methods/get_pending_requests.rs | 56 +++ server/src/ws/client_handler/methods/mod.rs | 5 + .../client_handler/methods/process_connect.rs | 62 +++ .../methods/process_new_payload_reply.rs | 72 +++ server/src/ws/client_handler/mod.rs | 2 + server/src/ws/mod.rs | 1 + 14 files changed, 580 insertions(+), 446 deletions(-) delete mode 100644 server/src/client/client_handler.rs create mode 100644 server/src/ws/client_handler/handler.rs create mode 100644 server/src/ws/client_handler/methods/disconnect_sessions.rs create mode 100644 server/src/ws/client_handler/methods/get_info.rs create mode 100644 server/src/ws/client_handler/methods/get_pending_requests.rs create mode 100644 server/src/ws/client_handler/methods/mod.rs create mode 100644 server/src/ws/client_handler/methods/process_connect.rs create mode 100644 server/src/ws/client_handler/methods/process_new_payload_reply.rs create mode 100644 server/src/ws/client_handler/mod.rs diff --git a/server/src/client/client_handler.rs b/server/src/client/client_handler.rs deleted file mode 100644 index 3c7a504e..00000000 --- a/server/src/client/client_handler.rs +++ /dev/null @@ -1,437 +0,0 @@ -use crate::{ - errors::NightlyError, - state::{ - ClientSockets, ClientToSessions, DisconnectUser, ModifySession, SendToClient, Sessions, - }, - structs::{ - app_messages::{ - app_messages::ServerToApp, payload::ResponsePayload, - user_connected_event::UserConnectedEvent, - user_disconnected_event::UserDisconnectedEvent, - }, - client_messages::{ - client_initialize::ClientInitializeResponse, - client_messages::{ClientToServer, ServerToClient}, - connect::ConnectResponse, - drop_sessions::DropSessionsResponse, - get_info::GetInfoResponse, - get_pending_requests::GetPendingRequestsResponse, - get_sessions::GetSessionsResponse, - }, - common::{AckMessage, ErrorMessage, SessionStatus}, - }, -}; -use axum::{ - extract::{ - ws::{Message, WebSocket}, - ConnectInfo, State, WebSocketUpgrade, - }, - response::Response, -}; -use futures::StreamExt; -use log::{debug, info}; -use std::net::SocketAddr; -use tokio::sync::RwLock; - -pub async fn on_new_client_connection( - ConnectInfo(ip): ConnectInfo, - State(sessions): State, - State(client_sockets): State, - State(client_to_sessions): State, - ws: WebSocketUpgrade, -) -> Response { - let ip = ip.clone().to_string().clone(); - ws.on_upgrade(move |socket| async move { - debug!("OPEN client connection from {}", ip); - client_handler(socket, sessions, client_sockets, client_to_sessions).await; - debug!("CLOSE client connection from {}", ip); - }) -} - -pub async fn client_handler( - socket: WebSocket, - sessions: Sessions, - client_sockets: ClientSockets, - client_to_sessions: ClientToSessions, -) { - let (sender, mut receiver) = socket.split(); - // Handle the new app connection here - // Wait for initialize message - let client_id: String = loop { - let msg = match receiver.next().await { - Some(msg) => match msg { - Ok(msg) => msg, - Err(_e) => { - return; - } - }, - None => { - return; - } - }; - let app_msg = match msg { - Message::Text(data) => match serde_json::from_str::(&data) { - Ok(app_msg) => app_msg, - Err(_) => continue, - }, - Message::Binary(_) => continue, - Message::Close(None) | Message::Close(Some(_)) => { - return; - } - Message::Ping(_) => { - continue; - } - Message::Pong(_) => { - continue; - } - }; - match app_msg { - ClientToServer::ClientInitializeRequest(connect_request) => { - // Insert client socket - { - let mut client_sockets_write = client_sockets.write().await; - client_sockets_write - .insert(connect_request.client_id.clone(), RwLock::new(sender)); - } - // Send response - let client_msg = - ServerToClient::ClientInitializeResponse(ClientInitializeResponse { - response_id: connect_request.response_id, - }); - client_sockets - .send_to_client(connect_request.client_id.clone(), client_msg) - .await - .unwrap_or_default(); - break connect_request.client_id; - } - _ => { - continue; - } - } - }; - info!("Client connected: {}", client_id); - // Main loop request handler - loop { - let sessions = sessions.clone(); - let msg = match receiver.next().await { - Some(msg) => match msg { - Ok(msg) => msg, - Err(_e) => { - let user_disconnected_event = - ServerToApp::UserDisconnectedEvent(UserDisconnectedEvent {}); - let user_sessions = client_to_sessions.get_sessions(client_id.clone()).await; - for session_id in user_sessions { - let mut sessions = sessions.write().await; - let session = match sessions.get_mut(&session_id) { - Some(session) => session, - None => { - // Should never happen - continue; - } - }; - session - .send_to_app(user_disconnected_event.clone()) - .await - .unwrap_or_default(); - session.update_status(SessionStatus::UserDisconnected); - } - // Remove client socket - client_sockets - .close_client_socket(client_id.clone()) - .await - .unwrap_or_default(); - return; - } - }, - None => { - let user_disconnected_event = - ServerToApp::UserDisconnectedEvent(UserDisconnectedEvent {}); - let user_sessions = client_to_sessions.get_sessions(client_id.clone()).await; - for session_id in user_sessions { - let mut sessions = sessions.write().await; - let session = match sessions.get_mut(&session_id) { - Some(session) => session, - None => { - continue; - } - }; - session - .send_to_app(user_disconnected_event.clone()) - .await - .unwrap_or_default(); - session.update_status(SessionStatus::UserDisconnected); - } - // Remove client socket - client_sockets - .close_client_socket(client_id.clone()) - .await - .unwrap_or_default(); - return; - } - }; - let app_msg = match msg { - Message::Text(data) => match serde_json::from_str::(&data) { - Ok(app_msg) => app_msg, - Err(_) => continue, - }, - Message::Binary(_) => continue, - Message::Close(None) | Message::Close(Some(_)) => { - let user_disconnected_event = - ServerToApp::UserDisconnectedEvent(UserDisconnectedEvent {}); - let user_sessions = client_to_sessions.get_sessions(client_id.clone()).await; - for session_id in user_sessions { - let mut sessions = sessions.write().await; - let session = match sessions.get_mut(&session_id) { - Some(session) => session, - None => { - continue; - } - }; - session - .send_to_app(user_disconnected_event.clone()) - .await - .unwrap_or_default(); - session.update_status(SessionStatus::UserDisconnected); - } - // Remove client socket - client_sockets - .close_client_socket(client_id.clone()) - .await - .unwrap_or_default(); - return; - } - Message::Ping(_) => { - continue; - } - Message::Pong(_) => { - continue; - } - }; - info!("Client {} new msg {:?}", client_id, app_msg); - - match app_msg { - ClientToServer::ConnectRequest(connect_request) => { - let mut sessions = sessions.write().await; - let session = match sessions.get_mut(&connect_request.session_id) { - Some(session) => session, - None => { - let error = ServerToClient::ErrorMessage(ErrorMessage { - response_id: connect_request.response_id, - error: NightlyError::SessionDoesNotExist.to_string(), - }); - client_sockets - .send_to_client(client_id.clone(), error) - .await - .unwrap_or_default(); - - info!( - "Client {} session does not exist {}", - client_id, connect_request.session_id - ); - - continue; - } - }; - - // Insert user socket - session.update_status(SessionStatus::ClientConnected); - session.client_state.device = connect_request.device.clone(); - session.client_state.connected_public_keys = connect_request.public_keys.clone(); - session.client_state.metadata = connect_request.metadata.clone(); - session.client_state.client_id = Some(connect_request.client_id.clone()); - // Setup notification - match &connect_request.notification { - Some(notification) => { - session.notification = Some(notification.clone()); - } - None => { - // skip - } - } - let app_event = ServerToApp::UserConnectedEvent(UserConnectedEvent { - public_keys: connect_request.public_keys, - metadata: connect_request.metadata, - }); - session.send_to_app(app_event).await.unwrap_or_default(); - - // Insert new session id into client_to_sessions - client_to_sessions - .add_session( - connect_request.client_id.clone(), - connect_request.session_id.clone(), - ) - .await; - - let client_reponse = ServerToClient::ConnectResponse(ConnectResponse { - response_id: connect_request.response_id, - }); - client_sockets - .send_to_client(client_id.clone(), client_reponse) - .await - .unwrap_or_default(); - } - ClientToServer::NewPayloadEventReply(new_payload_event_reply) => { - let mut sessions = sessions.write().await; - let session = match sessions.get_mut(&new_payload_event_reply.session_id) { - Some(session) => session, - None => { - let error = ServerToClient::ErrorMessage(ErrorMessage { - response_id: new_payload_event_reply.response_id, - error: NightlyError::SessionDoesNotExist.to_string(), - }); - client_sockets - .send_to_client(client_id.clone(), error) - .await - .unwrap_or_default(); - info!( - "Client {} session does not exist {}", - client_id, new_payload_event_reply.session_id - ); - continue; - } - }; - match session - .pending_requests - .remove(&new_payload_event_reply.request_id) - { - Some(_) => {} - None => { - let error = ServerToClient::ErrorMessage(ErrorMessage { - response_id: new_payload_event_reply.response_id, - error: NightlyError::RequestDoesNotExist.to_string(), - }); - client_sockets - .send_to_client(client_id.clone(), error) - .await - .unwrap_or_default(); - continue; - } - }; - // Send to app - let app_msg = ServerToApp::ResponsePayload(ResponsePayload { - response_id: new_payload_event_reply.request_id.clone(), - content: new_payload_event_reply.content.clone(), - }); - session.send_to_app(app_msg).await.unwrap_or_default(); - - let client_msg = ServerToClient::AckMessage(AckMessage { - response_id: new_payload_event_reply.response_id, - }); - client_sockets - .send_to_client(client_id.clone(), client_msg) - .await - .unwrap_or_default(); - } - ClientToServer::GetInfoRequest(get_info_request) => { - let sessions = sessions.read().await; - let session = match sessions.get(&get_info_request.session_id) { - Some(session) => session, - None => { - let error = ServerToClient::ErrorMessage(ErrorMessage { - response_id: get_info_request.response_id, - error: NightlyError::SessionDoesNotExist.to_string(), - }); - client_sockets - .send_to_client(client_id.clone(), error) - .await - .unwrap_or_default(); - info!( - "Client {} session does not exist {}", - client_id, get_info_request.session_id - ); - continue; - } - }; - let response = ServerToClient::GetInfoResponse(GetInfoResponse { - response_id: get_info_request.response_id, - network: session.network.clone(), - version: session.version.clone(), - app_metadata: session.app_state.metadata.clone(), - }); - client_sockets - .send_to_client(client_id.clone(), response) - .await - .unwrap_or_default(); - } - ClientToServer::GetPendingRequestsRequest(get_pending_requests_request) => { - let sessions = sessions.read().await; - let session = match sessions.get(&get_pending_requests_request.session_id) { - Some(session) => session, - None => { - let error = ServerToClient::ErrorMessage(ErrorMessage { - response_id: get_pending_requests_request.response_id, - error: NightlyError::SessionDoesNotExist.to_string(), - }); - client_sockets - .send_to_client(client_id.clone(), error) - .await - .unwrap_or_default(); - info!( - "Client {} session does not exist {}", - client_id, get_pending_requests_request.session_id - ); - continue; - } - }; - let pending_requests = session - .pending_requests - .clone() - .iter() - .map(|(_, v)| v.clone()) - .collect::>(); - let response = - ServerToClient::GetPendingRequestsResponse(GetPendingRequestsResponse { - requests: pending_requests, - response_id: get_pending_requests_request.response_id, - }); - client_sockets - .send_to_client(client_id.clone(), response) - .await - .unwrap_or_default(); - } - ClientToServer::GetSessionsRequest(get_sessions_request) => { - let sessions = client_to_sessions.get_sessions(client_id.clone()).await; - let response = ServerToClient::GetSessionsResponse(GetSessionsResponse { - sessions, - response_id: get_sessions_request.response_id, - }); - client_sockets - .send_to_client(client_id.clone(), response) - .await - .unwrap_or_default(); - } - ClientToServer::DropSessionsRequest(drop_sessions_request) => { - let mut dropped_sessions = Vec::new(); - // TODO handle disconnecting app - for session_id in drop_sessions_request.sessions { - if sessions.disconnect_user(session_id.clone()).await.is_ok() { - dropped_sessions.push(session_id.clone()); - }; - - client_to_sessions - .remove_session(client_id.clone(), session_id.clone()) - .await; - } - let response = ServerToClient::DropSessionsResponse(DropSessionsResponse { - dropped_sessions, - response_id: drop_sessions_request.response_id, - }); - client_sockets - .send_to_client(client_id.clone(), response) - .await - .unwrap_or_default(); - } - ClientToServer::ClientInitializeRequest(_) => { - let error = ServerToClient::ErrorMessage(ErrorMessage { - response_id: "".to_string(), - error: NightlyError::ClientAlreadyInitialized.to_string(), - }); - client_sockets - .send_to_client(client_id.clone(), error) - .await - .unwrap_or_default(); - } - } - info!("Client {} msg handled", client_id); - } -} diff --git a/server/src/client/mod.rs b/server/src/client/mod.rs index 3c53773f..6cef9d5f 100644 --- a/server/src/client/mod.rs +++ b/server/src/client/mod.rs @@ -1,4 +1,3 @@ -pub mod client_handler; pub mod connect_session; pub mod drop_sessions; pub mod get_pending_request; diff --git a/server/src/router.rs b/server/src/router.rs index a280755b..541969b3 100644 --- a/server/src/router.rs +++ b/server/src/router.rs @@ -10,18 +10,20 @@ use tracing_subscriber::EnvFilter; use crate::{ client::{ - client_handler::on_new_client_connection, connect_session::connect_session, - drop_sessions::drop_sessions, get_pending_request::get_pending_request, - get_pending_requests::get_pending_requests, get_session_info::get_session_info, - get_sessions::get_sessions, get_wallets_metadata::get_wallets_metadata, - resolve_request::resolve_request, + connect_session::connect_session, drop_sessions::drop_sessions, + get_pending_request::get_pending_request, get_pending_requests::get_pending_requests, + get_session_info::get_session_info, get_sessions::get_sessions, + get_wallets_metadata::get_wallets_metadata, resolve_request::resolve_request, }, handle_error::handle_error, sesssion_cleaner::start_cleaning_sessions, state::ServerState, structs::http_endpoints::HttpEndpoint, utils::get_cors, - ws::app_handler::handler::on_new_app_connection, + ws::{ + app_handler::handler::on_new_app_connection, + client_handler::handler::on_new_client_connection, + }, }; use tower_http::trace::TraceLayer; pub async fn get_router() -> Router { diff --git a/server/src/structs/session.rs b/server/src/structs/session.rs index 9a020cc4..4b5f61bf 100644 --- a/server/src/structs/session.rs +++ b/server/src/structs/session.rs @@ -3,7 +3,11 @@ use std::collections::HashMap; use crate::{state::ClientId, utils::get_timestamp_in_milliseconds}; use super::{ - app_messages::{app_messages::ServerToApp, initialize::InitializeRequest}, + app_messages::{ + app_messages::ServerToApp, initialize::InitializeRequest, + user_connected_event::UserConnectedEvent, + }, + client_messages::connect::ConnectRequest, common::{AppMetadata, Device, Network, Notification, PendingRequest, SessionStatus, Version}, }; use anyhow::Result; @@ -109,6 +113,28 @@ impl Session { creation_timestamp: get_timestamp_in_milliseconds(), } } + + pub async fn connect_user(&mut self, connect_request: &ConnectRequest) { + // Update session status + self.update_status(SessionStatus::ClientConnected); + + // Update client state + self.client_state.device = connect_request.device.clone(); + self.client_state.connected_public_keys = connect_request.public_keys.clone(); + self.client_state.metadata = connect_request.metadata.clone(); + self.client_state.client_id = Some(connect_request.client_id.clone()); + + if let Some(notification) = &connect_request.notification { + self.notification = Some(notification.clone()); + } + + // Send user connected event to app + let app_event = ServerToApp::UserConnectedEvent(UserConnectedEvent { + public_keys: connect_request.public_keys.clone(), + metadata: connect_request.metadata.clone(), + }); + self.send_to_app(app_event).await.unwrap_or_default(); + } } #[derive(Debug)] pub struct AppState { diff --git a/server/src/ws/app_handler/handler.rs b/server/src/ws/app_handler/handler.rs index 4bc4416e..3d551385 100644 --- a/server/src/ws/app_handler/handler.rs +++ b/server/src/ws/app_handler/handler.rs @@ -18,7 +18,7 @@ use axum::{ response::Response, }; use futures::StreamExt; -use log::{debug, error, warn}; +use log::{debug, warn}; use std::net::SocketAddr; pub async fn on_new_app_connection( diff --git a/server/src/ws/client_handler/handler.rs b/server/src/ws/client_handler/handler.rs new file mode 100644 index 00000000..0f1aa2b0 --- /dev/null +++ b/server/src/ws/client_handler/handler.rs @@ -0,0 +1,259 @@ +use crate::{ + errors::NightlyError, + state::{ClientSockets, ClientToSessions, ModifySession, SendToClient, Sessions}, + structs::{ + client_messages::{ + client_initialize::ClientInitializeResponse, + client_messages::{ClientToServer, ServerToClient}, + drop_sessions::DropSessionsResponse, + get_sessions::GetSessionsResponse, + }, + common::ErrorMessage, + }, + ws::client_handler::methods::{ + disconnect_sessions::disconnect_client_sessions, get_info::process_get_info, + get_pending_requests::process_get_pending_requests, + process_connect::process_client_connect, + process_new_payload_reply::process_new_payload_event_reply, + }, +}; +use axum::{ + extract::{ + ws::{Message, WebSocket}, + ConnectInfo, State, WebSocketUpgrade, + }, + response::Response, +}; +use futures::StreamExt; +use log::{debug, info, warn}; +use std::net::SocketAddr; +use tokio::sync::RwLock; + +pub async fn on_new_client_connection( + ConnectInfo(ip): ConnectInfo, + State(sessions): State, + State(client_sockets): State, + State(client_to_sessions): State, + ws: WebSocketUpgrade, +) -> Response { + let ip = ip.clone().to_string().clone(); + ws.on_upgrade(move |socket| async move { + debug!("OPEN client connection from {}", ip); + client_handler(socket, sessions, client_sockets, client_to_sessions).await; + debug!("CLOSE client connection from {}", ip); + }) +} + +pub async fn client_handler( + socket: WebSocket, + sessions: Sessions, + client_sockets: ClientSockets, + client_to_sessions: ClientToSessions, +) { + let (sender, mut receiver) = socket.split(); + let sessions = sessions.clone(); + // Handle the new app connection here + // Wait for initialize message + let client_id: String = loop { + // If stream is closed, or message is not received, return + let msg = match receiver.next().await { + Some(Ok(msg)) => msg, + Some(Err(_)) | None => { + return; + } + }; + + let app_msg = match msg { + Message::Text(data) => match serde_json::from_str::(&data) { + Ok(app_msg) => app_msg, + Err(_) => continue, + }, + Message::Close(None) | Message::Close(Some(_)) => { + return; + } + _ => continue, + }; + + match app_msg { + ClientToServer::ClientInitializeRequest(connect_request) => { + // Insert client socket + { + let mut client_sockets_write = client_sockets.write().await; + client_sockets_write + .insert(connect_request.client_id.clone(), RwLock::new(sender)); + } + // Send response + let client_msg = + ServerToClient::ClientInitializeResponse(ClientInitializeResponse { + response_id: connect_request.response_id, + }); + client_sockets + .send_to_client(connect_request.client_id.clone(), client_msg) + .await + .unwrap_or_default(); + break connect_request.client_id; + } + _ => { + continue; + } + } + }; + info!("Client connected: {}", client_id); + + // Main loop request handler + loop { + let msg = match receiver.next().await { + Some(Ok(msg)) => msg, + Some(Err(_)) | None => { + // Disconnect all user sessions + if let Err(err) = disconnect_client_sessions( + client_id.clone(), + &sessions, + &client_to_sessions, + None, + ) + .await + { + warn!("Error disconnecting session: {}", err); + } + + // Remove client socket + client_sockets + .close_client_socket(client_id.clone()) + .await + .unwrap_or_default(); + + return; + } + }; + + let app_msg = match msg { + Message::Text(data) => match serde_json::from_str::(&data) { + Ok(app_msg) => app_msg, + Err(_) => continue, + }, + Message::Close(None) | Message::Close(Some(_)) => { + // Disconnect all user sessions + if let Err(err) = disconnect_client_sessions( + client_id.clone(), + &sessions, + &client_to_sessions, + None, + ) + .await + { + warn!("Error disconnecting session: {}", err); + } + // Remove client socket + client_sockets + .close_client_socket(client_id.clone()) + .await + .unwrap_or_default(); + + return; + } + _ => continue, + }; + info!("Client {} new msg {:?}", client_id, app_msg); + + match app_msg { + ClientToServer::ConnectRequest(connect_request) => { + if let Err(err) = process_client_connect( + &client_id, + &sessions, + &client_sockets, + &client_to_sessions, + connect_request, + ) + .await + { + warn!("Error processing connect request: {}", err); + } + } + ClientToServer::NewPayloadEventReply(new_payload_event_reply) => { + if let Err(err) = process_new_payload_event_reply( + &client_id, + &sessions, + &client_sockets, + new_payload_event_reply, + ) + .await + { + warn!("Error processing new payload event reply: {}", err); + } + } + ClientToServer::GetInfoRequest(get_info_request) => { + if let Err(err) = + process_get_info(&client_id, &sessions, &client_sockets, get_info_request).await + { + warn!("Error processing get info request: {}", err); + } + } + ClientToServer::GetPendingRequestsRequest(get_pending_requests_request) => { + if let Err(err) = process_get_pending_requests( + &client_id, + &sessions, + &client_sockets, + get_pending_requests_request, + ) + .await + { + warn!("Error processing get pending requests request: {}", err); + } + } + ClientToServer::GetSessionsRequest(get_sessions_request) => { + let sessions = client_to_sessions.get_sessions(client_id.clone()).await; + let response = ServerToClient::GetSessionsResponse(GetSessionsResponse { + sessions, + response_id: get_sessions_request.response_id, + }); + client_sockets + .send_to_client(client_id.clone(), response) + .await + .unwrap_or_default(); + } + ClientToServer::DropSessionsRequest(drop_sessions_request) => { + match disconnect_client_sessions( + client_id.clone(), + &sessions, + &client_to_sessions, + Some(&drop_sessions_request.sessions), + ) + .await + { + Ok(dropped_sessions) => { + let response = ServerToClient::DropSessionsResponse(DropSessionsResponse { + dropped_sessions, + response_id: drop_sessions_request.response_id, + }); + client_sockets + .send_to_client(client_id.clone(), response) + .await + .unwrap_or_default(); + } + Err(err) => { + let error = ServerToClient::ErrorMessage(ErrorMessage { + response_id: drop_sessions_request.response_id, + error: err.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + } + } + } + ClientToServer::ClientInitializeRequest(_) => { + let error = ServerToClient::ErrorMessage(ErrorMessage { + response_id: "".to_string(), + error: NightlyError::ClientAlreadyInitialized.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + } + } + info!("Client {} msg handled", client_id); + } +} diff --git a/server/src/ws/client_handler/methods/disconnect_sessions.rs b/server/src/ws/client_handler/methods/disconnect_sessions.rs new file mode 100644 index 00000000..01e56920 --- /dev/null +++ b/server/src/ws/client_handler/methods/disconnect_sessions.rs @@ -0,0 +1,34 @@ +use crate::state::{ClientToSessions, DisconnectUser, ModifySession, Sessions}; +use anyhow::{bail, Result}; + +pub async fn disconnect_client_sessions( + client_id: String, + sessions: &Sessions, + client_to_sessions: &ClientToSessions, + sessions_list: Option<&Vec>, +) -> Result> { + // If not specified get all sessions for the client + let user_sessions = match sessions_list { + Some(sessions) => sessions.clone(), + None => client_to_sessions.get_sessions(client_id.clone()).await, + }; + + let mut dropped_sessions = Vec::new(); + + // Send user disconnected event to all sessions + for session_id in user_sessions { + if sessions.disconnect_user(session_id.clone()).await.is_ok() { + dropped_sessions.push(session_id.clone()); + }; + + client_to_sessions + .remove_session(client_id.clone(), session_id.clone()) + .await; + } + + if dropped_sessions.is_empty() { + bail!("No sessions found for client") + } else { + Ok(dropped_sessions) + } +} diff --git a/server/src/ws/client_handler/methods/get_info.rs b/server/src/ws/client_handler/methods/get_info.rs new file mode 100644 index 00000000..c55eb179 --- /dev/null +++ b/server/src/ws/client_handler/methods/get_info.rs @@ -0,0 +1,53 @@ +use crate::{ + errors::NightlyError, + state::{ClientSockets, SendToClient, Sessions}, + structs::{ + client_messages::{ + client_messages::ServerToClient, + get_info::{GetInfoRequest, GetInfoResponse}, + }, + common::ErrorMessage, + }, +}; +use anyhow::{bail, Result}; + +pub async fn process_get_info( + client_id: &String, + sessions: &Sessions, + client_sockets: &ClientSockets, + get_info_request: GetInfoRequest, +) -> Result<()> { + let mut sessions_write = sessions.write().await; + let session = match sessions_write.get_mut(&get_info_request.session_id) { + Some(session) => session, + None => { + let error = ServerToClient::ErrorMessage(ErrorMessage { + response_id: get_info_request.response_id, + error: NightlyError::SessionDoesNotExist.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + + bail!( + "Fail, client: {:?} to session: {:?}, session does not exist", + client_id, + get_info_request.session_id + ); + } + }; + + let response = ServerToClient::GetInfoResponse(GetInfoResponse { + response_id: get_info_request.response_id, + network: session.network.clone(), + version: session.version.clone(), + app_metadata: session.app_state.metadata.clone(), + }); + client_sockets + .send_to_client(client_id.clone(), response) + .await + .unwrap_or_default(); + + Ok(()) +} diff --git a/server/src/ws/client_handler/methods/get_pending_requests.rs b/server/src/ws/client_handler/methods/get_pending_requests.rs new file mode 100644 index 00000000..8a59ab80 --- /dev/null +++ b/server/src/ws/client_handler/methods/get_pending_requests.rs @@ -0,0 +1,56 @@ +use crate::{ + errors::NightlyError, + state::{ClientSockets, SendToClient, Sessions}, + structs::{ + client_messages::{ + client_messages::ServerToClient, + get_pending_requests::{GetPendingRequestsRequest, GetPendingRequestsResponse}, + }, + common::ErrorMessage, + }, +}; +use anyhow::{bail, Result}; + +pub async fn process_get_pending_requests( + client_id: &String, + sessions: &Sessions, + client_sockets: &ClientSockets, + get_pending_requests: GetPendingRequestsRequest, +) -> Result<()> { + let mut sessions_write = sessions.write().await; + let session = match sessions_write.get_mut(&get_pending_requests.session_id) { + Some(session) => session, + None => { + let error = ServerToClient::ErrorMessage(ErrorMessage { + response_id: get_pending_requests.response_id, + error: NightlyError::SessionDoesNotExist.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + + bail!( + "Fail, client: {:?} to session: {:?}, session does not exist", + client_id, + get_pending_requests.session_id + ); + } + }; + + let pending_requests = session + .pending_requests + .values() + .cloned() + .collect::>(); + let response = ServerToClient::GetPendingRequestsResponse(GetPendingRequestsResponse { + requests: pending_requests, + response_id: get_pending_requests.response_id, + }); + client_sockets + .send_to_client(client_id.clone(), response) + .await + .unwrap_or_default(); + + Ok(()) +} diff --git a/server/src/ws/client_handler/methods/mod.rs b/server/src/ws/client_handler/methods/mod.rs new file mode 100644 index 00000000..262f32f3 --- /dev/null +++ b/server/src/ws/client_handler/methods/mod.rs @@ -0,0 +1,5 @@ +pub mod disconnect_sessions; +pub mod get_info; +pub mod get_pending_requests; +pub mod process_connect; +pub mod process_new_payload_reply; diff --git a/server/src/ws/client_handler/methods/process_connect.rs b/server/src/ws/client_handler/methods/process_connect.rs new file mode 100644 index 00000000..bdc837c0 --- /dev/null +++ b/server/src/ws/client_handler/methods/process_connect.rs @@ -0,0 +1,62 @@ +use crate::{ + errors::NightlyError, + state::{ClientSockets, ClientToSessions, ModifySession, SendToClient, Sessions}, + structs::{ + client_messages::{ + client_messages::ServerToClient, + connect::{ConnectRequest, ConnectResponse}, + }, + common::ErrorMessage, + }, +}; +use anyhow::{bail, Result}; + +pub async fn process_client_connect( + client_id: &String, + sessions: &Sessions, + client_sockets: &ClientSockets, + client_to_sessions: &ClientToSessions, + connect_request: ConnectRequest, +) -> Result<()> { + let mut sessions_write = sessions.write().await; + let session = match sessions_write.get_mut(&connect_request.session_id) { + Some(session) => session, + None => { + let error = ServerToClient::ErrorMessage(ErrorMessage { + response_id: connect_request.response_id, + error: NightlyError::SessionDoesNotExist.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + + bail!( + "Failed to connect client: {:?} to session: {:?}, session does not exist", + client_id, + connect_request.session_id + ); + } + }; + + // Update session + session.connect_user(&connect_request).await; + + // Insert new session id into client_to_sessions + client_to_sessions + .add_session( + connect_request.client_id.clone(), + connect_request.session_id.clone(), + ) + .await; + + let client_response = ServerToClient::ConnectResponse(ConnectResponse { + response_id: connect_request.response_id, + }); + client_sockets + .send_to_client(client_id.clone(), client_response) + .await + .unwrap_or_default(); + + Ok(()) +} diff --git a/server/src/ws/client_handler/methods/process_new_payload_reply.rs b/server/src/ws/client_handler/methods/process_new_payload_reply.rs new file mode 100644 index 00000000..af8e8a3a --- /dev/null +++ b/server/src/ws/client_handler/methods/process_new_payload_reply.rs @@ -0,0 +1,72 @@ +use crate::{ + errors::NightlyError, + state::{ClientSockets, SendToClient, Sessions}, + structs::{ + app_messages::{app_messages::ServerToApp, payload::ResponsePayload}, + client_messages::{ + client_messages::ServerToClient, new_payload_event::NewPayloadEventReply, + }, + common::{AckMessage, ErrorMessage}, + }, +}; +use anyhow::{bail, Result}; + +pub async fn process_new_payload_event_reply( + client_id: &String, + sessions: &Sessions, + client_sockets: &ClientSockets, + payload: NewPayloadEventReply, +) -> Result<()> { + let mut sessions_write = sessions.write().await; + let session = match sessions_write.get_mut(&payload.session_id) { + Some(session) => session, + None => { + let error = ServerToClient::ErrorMessage(ErrorMessage { + response_id: payload.response_id, + error: NightlyError::SessionDoesNotExist.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + + bail!( + "Fail, client: {:?} to session: {:?}, session does not exist", + client_id, + payload.session_id + ); + } + }; + + // Update session + if let None = session.pending_requests.remove(&payload.request_id) { + let error: ServerToClient = ServerToClient::ErrorMessage(ErrorMessage { + response_id: payload.response_id, + error: NightlyError::RequestDoesNotExist.to_string(), + }); + client_sockets + .send_to_client(client_id.clone(), error) + .await + .unwrap_or_default(); + + bail!("Failed to process new payload event reply, request does not exist, session_id: {:?}, request_id: {:?}", payload.session_id, payload.request_id); + }; + + // Send response to app + let app_msg = ServerToApp::ResponsePayload(ResponsePayload { + response_id: payload.request_id.clone(), + content: payload.content.clone(), + }); + session.send_to_app(app_msg).await.unwrap_or_default(); + + // Send ack to client + let client_msg = ServerToClient::AckMessage(AckMessage { + response_id: payload.response_id, + }); + client_sockets + .send_to_client(client_id.clone(), client_msg) + .await + .unwrap_or_default(); + + Ok(()) +} diff --git a/server/src/ws/client_handler/mod.rs b/server/src/ws/client_handler/mod.rs new file mode 100644 index 00000000..56320b4d --- /dev/null +++ b/server/src/ws/client_handler/mod.rs @@ -0,0 +1,2 @@ +pub mod handler; +pub mod methods; diff --git a/server/src/ws/mod.rs b/server/src/ws/mod.rs index 4b3d4783..650292b5 100644 --- a/server/src/ws/mod.rs +++ b/server/src/ws/mod.rs @@ -1 +1,2 @@ pub mod app_handler; +pub mod client_handler;