Skip to content

Commit

Permalink
refactor client ws handler
Browse files Browse the repository at this point in the history
  • Loading branch information
“Giems” committed Feb 6, 2024
1 parent 36c1b19 commit 5ebde56
Show file tree
Hide file tree
Showing 14 changed files with 580 additions and 446 deletions.
437 changes: 0 additions & 437 deletions server/src/client/client_handler.rs

This file was deleted.

1 change: 0 additions & 1 deletion server/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod client_handler;
pub mod connect_session;
pub mod drop_sessions;
pub mod get_pending_request;
Expand Down
14 changes: 8 additions & 6 deletions server/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ use tracing_subscriber::EnvFilter;

use crate::{
client::{
client_handler::on_new_client_connection, connect_session::connect_session,
drop_sessions::drop_sessions, get_pending_request::get_pending_request,
get_pending_requests::get_pending_requests, get_session_info::get_session_info,
get_sessions::get_sessions, get_wallets_metadata::get_wallets_metadata,
resolve_request::resolve_request,
connect_session::connect_session, drop_sessions::drop_sessions,
get_pending_request::get_pending_request, get_pending_requests::get_pending_requests,
get_session_info::get_session_info, get_sessions::get_sessions,
get_wallets_metadata::get_wallets_metadata, resolve_request::resolve_request,
},
handle_error::handle_error,
sesssion_cleaner::start_cleaning_sessions,
state::ServerState,
structs::http_endpoints::HttpEndpoint,
utils::get_cors,
ws::app_handler::handler::on_new_app_connection,
ws::{
app_handler::handler::on_new_app_connection,
client_handler::handler::on_new_client_connection,
},
};
use tower_http::trace::TraceLayer;
pub async fn get_router() -> Router {
Expand Down
28 changes: 27 additions & 1 deletion server/src/structs/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use std::collections::HashMap;
use crate::{state::ClientId, utils::get_timestamp_in_milliseconds};

use super::{
app_messages::{app_messages::ServerToApp, initialize::InitializeRequest},
app_messages::{
app_messages::ServerToApp, initialize::InitializeRequest,
user_connected_event::UserConnectedEvent,
},
client_messages::connect::ConnectRequest,
common::{AppMetadata, Device, Network, Notification, PendingRequest, SessionStatus, Version},
};
use anyhow::Result;
Expand Down Expand Up @@ -109,6 +113,28 @@ impl Session {
creation_timestamp: get_timestamp_in_milliseconds(),
}
}

pub async fn connect_user(&mut self, connect_request: &ConnectRequest) {
// Update session status
self.update_status(SessionStatus::ClientConnected);

// Update client state
self.client_state.device = connect_request.device.clone();
self.client_state.connected_public_keys = connect_request.public_keys.clone();
self.client_state.metadata = connect_request.metadata.clone();
self.client_state.client_id = Some(connect_request.client_id.clone());

if let Some(notification) = &connect_request.notification {
self.notification = Some(notification.clone());
}

// Send user connected event to app
let app_event = ServerToApp::UserConnectedEvent(UserConnectedEvent {
public_keys: connect_request.public_keys.clone(),
metadata: connect_request.metadata.clone(),
});
self.send_to_app(app_event).await.unwrap_or_default();
}
}
#[derive(Debug)]
pub struct AppState {
Expand Down
2 changes: 1 addition & 1 deletion server/src/ws/app_handler/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use axum::{
response::Response,
};
use futures::StreamExt;
use log::{debug, error, warn};
use log::{debug, warn};
use std::net::SocketAddr;

pub async fn on_new_app_connection(
Expand Down
259 changes: 259 additions & 0 deletions server/src/ws/client_handler/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
use crate::{
errors::NightlyError,
state::{ClientSockets, ClientToSessions, ModifySession, SendToClient, Sessions},
structs::{
client_messages::{
client_initialize::ClientInitializeResponse,
client_messages::{ClientToServer, ServerToClient},
drop_sessions::DropSessionsResponse,
get_sessions::GetSessionsResponse,
},
common::ErrorMessage,
},
ws::client_handler::methods::{
disconnect_sessions::disconnect_client_sessions, get_info::process_get_info,
get_pending_requests::process_get_pending_requests,
process_connect::process_client_connect,
process_new_payload_reply::process_new_payload_event_reply,
},
};
use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::Response,
};
use futures::StreamExt;
use log::{debug, info, warn};
use std::net::SocketAddr;
use tokio::sync::RwLock;

pub async fn on_new_client_connection(
ConnectInfo(ip): ConnectInfo<SocketAddr>,
State(sessions): State<Sessions>,
State(client_sockets): State<ClientSockets>,
State(client_to_sessions): State<ClientToSessions>,
ws: WebSocketUpgrade,
) -> Response {
let ip = ip.clone().to_string().clone();
ws.on_upgrade(move |socket| async move {
debug!("OPEN client connection from {}", ip);
client_handler(socket, sessions, client_sockets, client_to_sessions).await;
debug!("CLOSE client connection from {}", ip);
})
}

pub async fn client_handler(
socket: WebSocket,
sessions: Sessions,
client_sockets: ClientSockets,
client_to_sessions: ClientToSessions,
) {
let (sender, mut receiver) = socket.split();
let sessions = sessions.clone();
// Handle the new app connection here
// Wait for initialize message
let client_id: String = loop {
// If stream is closed, or message is not received, return
let msg = match receiver.next().await {
Some(Ok(msg)) => msg,
Some(Err(_)) | None => {
return;
}
};

let app_msg = match msg {
Message::Text(data) => match serde_json::from_str::<ClientToServer>(&data) {
Ok(app_msg) => app_msg,
Err(_) => continue,
},
Message::Close(None) | Message::Close(Some(_)) => {
return;
}
_ => continue,
};

match app_msg {
ClientToServer::ClientInitializeRequest(connect_request) => {
// Insert client socket
{
let mut client_sockets_write = client_sockets.write().await;
client_sockets_write
.insert(connect_request.client_id.clone(), RwLock::new(sender));
}
// Send response
let client_msg =
ServerToClient::ClientInitializeResponse(ClientInitializeResponse {
response_id: connect_request.response_id,
});
client_sockets
.send_to_client(connect_request.client_id.clone(), client_msg)
.await
.unwrap_or_default();
break connect_request.client_id;
}
_ => {
continue;
}
}
};
info!("Client connected: {}", client_id);

// Main loop request handler
loop {
let msg = match receiver.next().await {
Some(Ok(msg)) => msg,
Some(Err(_)) | None => {
// Disconnect all user sessions
if let Err(err) = disconnect_client_sessions(
client_id.clone(),
&sessions,
&client_to_sessions,
None,
)
.await
{
warn!("Error disconnecting session: {}", err);
}

// Remove client socket
client_sockets
.close_client_socket(client_id.clone())
.await
.unwrap_or_default();

return;
}
};

let app_msg = match msg {
Message::Text(data) => match serde_json::from_str::<ClientToServer>(&data) {
Ok(app_msg) => app_msg,
Err(_) => continue,
},
Message::Close(None) | Message::Close(Some(_)) => {
// Disconnect all user sessions
if let Err(err) = disconnect_client_sessions(
client_id.clone(),
&sessions,
&client_to_sessions,
None,
)
.await
{
warn!("Error disconnecting session: {}", err);
}
// Remove client socket
client_sockets
.close_client_socket(client_id.clone())
.await
.unwrap_or_default();

return;
}
_ => continue,
};
info!("Client {} new msg {:?}", client_id, app_msg);

match app_msg {
ClientToServer::ConnectRequest(connect_request) => {
if let Err(err) = process_client_connect(
&client_id,
&sessions,
&client_sockets,
&client_to_sessions,
connect_request,
)
.await
{
warn!("Error processing connect request: {}", err);
}
}
ClientToServer::NewPayloadEventReply(new_payload_event_reply) => {
if let Err(err) = process_new_payload_event_reply(
&client_id,
&sessions,
&client_sockets,
new_payload_event_reply,
)
.await
{
warn!("Error processing new payload event reply: {}", err);
}
}
ClientToServer::GetInfoRequest(get_info_request) => {
if let Err(err) =
process_get_info(&client_id, &sessions, &client_sockets, get_info_request).await
{
warn!("Error processing get info request: {}", err);
}
}
ClientToServer::GetPendingRequestsRequest(get_pending_requests_request) => {
if let Err(err) = process_get_pending_requests(
&client_id,
&sessions,
&client_sockets,
get_pending_requests_request,
)
.await
{
warn!("Error processing get pending requests request: {}", err);
}
}
ClientToServer::GetSessionsRequest(get_sessions_request) => {
let sessions = client_to_sessions.get_sessions(client_id.clone()).await;
let response = ServerToClient::GetSessionsResponse(GetSessionsResponse {
sessions,
response_id: get_sessions_request.response_id,
});
client_sockets
.send_to_client(client_id.clone(), response)
.await
.unwrap_or_default();
}
ClientToServer::DropSessionsRequest(drop_sessions_request) => {
match disconnect_client_sessions(
client_id.clone(),
&sessions,
&client_to_sessions,
Some(&drop_sessions_request.sessions),
)
.await
{
Ok(dropped_sessions) => {
let response = ServerToClient::DropSessionsResponse(DropSessionsResponse {
dropped_sessions,
response_id: drop_sessions_request.response_id,
});
client_sockets
.send_to_client(client_id.clone(), response)
.await
.unwrap_or_default();
}
Err(err) => {
let error = ServerToClient::ErrorMessage(ErrorMessage {
response_id: drop_sessions_request.response_id,
error: err.to_string(),
});
client_sockets
.send_to_client(client_id.clone(), error)
.await
.unwrap_or_default();
}
}
}
ClientToServer::ClientInitializeRequest(_) => {
let error = ServerToClient::ErrorMessage(ErrorMessage {
response_id: "".to_string(),
error: NightlyError::ClientAlreadyInitialized.to_string(),
});
client_sockets
.send_to_client(client_id.clone(), error)
.await
.unwrap_or_default();
}
}
info!("Client {} msg handled", client_id);
}
}
34 changes: 34 additions & 0 deletions server/src/ws/client_handler/methods/disconnect_sessions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::state::{ClientToSessions, DisconnectUser, ModifySession, Sessions};
use anyhow::{bail, Result};

pub async fn disconnect_client_sessions(
client_id: String,
sessions: &Sessions,
client_to_sessions: &ClientToSessions,
sessions_list: Option<&Vec<String>>,
) -> Result<Vec<String>> {
// If not specified get all sessions for the client
let user_sessions = match sessions_list {
Some(sessions) => sessions.clone(),
None => client_to_sessions.get_sessions(client_id.clone()).await,
};

let mut dropped_sessions = Vec::new();

// Send user disconnected event to all sessions
for session_id in user_sessions {
if sessions.disconnect_user(session_id.clone()).await.is_ok() {
dropped_sessions.push(session_id.clone());
};

client_to_sessions
.remove_session(client_id.clone(), session_id.clone())
.await;
}

if dropped_sessions.is_empty() {
bail!("No sessions found for client")
} else {
Ok(dropped_sessions)
}
}
Loading

0 comments on commit 5ebde56

Please sign in to comment.