Skip to content

Commit

Permalink
Merge pull request #85 from nightly-labs/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
Giems authored Feb 6, 2024
2 parents 2785be7 + 36c1b19 commit 491ed97
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 421 deletions.
416 changes: 0 additions & 416 deletions server/src/app/app_handler.rs

This file was deleted.

2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod app;
pub mod client;
pub mod errors;
pub mod handle_error;
Expand All @@ -8,3 +7,4 @@ pub mod state;
pub mod structs;
pub mod utils;
pub mod wallets;
pub mod ws;
2 changes: 1 addition & 1 deletion server/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tower::ServiceBuilder;
use tracing_subscriber::EnvFilter;

use crate::{
app::app_handler::on_new_app_connection,
client::{
client_handler::on_new_client_connection, connect_session::connect_session,
drop_sessions::drop_sessions, get_pending_request::get_pending_request,
Expand All @@ -22,6 +21,7 @@ use crate::{
state::ServerState,
structs::http_endpoints::HttpEndpoint,
utils::get_cors,
ws::app_handler::handler::on_new_app_connection,
};
use tower_http::trace::TraceLayer;
pub async fn get_router() -> Router {
Expand Down
1 change: 0 additions & 1 deletion server/src/sesssion_cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub fn start_cleaning_sessions(sessions: Sessions, client_to_sessions: ClientToS
let _ = socket.close();
}

drop(session);
sessions.remove(&session_id);
}
}
Expand Down
32 changes: 30 additions & 2 deletions server/src/structs/session.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashMap;

use crate::state::ClientId;
use crate::{state::ClientId, utils::get_timestamp_in_milliseconds};

use super::{
app_messages::app_messages::ServerToApp,
app_messages::{app_messages::ServerToApp, initialize::InitializeRequest},
common::{AppMetadata, Device, Network, Notification, PendingRequest, SessionStatus, Version},
};
use anyhow::Result;
Expand Down Expand Up @@ -81,6 +81,34 @@ impl Session {
}
}
}

pub fn new(
session_id: &str,
connection_id: Uuid,
sender: SplitSink<WebSocket, Message>,
init_data: &InitializeRequest,
) -> Self {
Session {
session_id: session_id.to_owned(),
status: SessionStatus::WaitingForClient,
persistent: init_data.persistent,
app_state: AppState {
metadata: init_data.app_metadata.clone(),
app_socket: HashMap::from([(connection_id, sender)]),
},
client_state: ClientState {
client_id: None,
device: None,
connected_public_keys: Vec::new(),
metadata: None,
},
network: init_data.network.clone(),
version: init_data.version.clone(),
pending_requests: HashMap::new(),
notification: None,
creation_timestamp: get_timestamp_in_milliseconds(),
}
}
}
#[derive(Debug)]
pub struct AppState {
Expand Down
203 changes: 203 additions & 0 deletions server/src/ws/app_handler/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
use super::methods::{
disconnect_session::disconnect_session, initialize_session::initialize_session_connection,
};
use crate::{
state::{ClientSockets, ClientToSessions, SendToClient, Sessions},
structs::{
app_messages::app_messages::AppToServer,
client_messages::{client_messages::ServerToClient, new_payload_event::NewPayloadEvent},
common::{Device, PendingRequest},
notification_msg::{trigger_notification, NotificationPayload},
},
};
use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::Response,
};
use futures::StreamExt;
use log::{debug, error, warn};
use std::net::SocketAddr;

pub async fn on_new_app_connection(
ConnectInfo(ip): ConnectInfo<SocketAddr>,
State(sessions): State<Sessions>,
State(client_sockets): State<ClientSockets>,
State(client_to_sessions): State<ClientToSessions>,
ws: WebSocketUpgrade,
) -> Response {
let ip = ip.clone().to_string().clone();
ws.on_upgrade(move |socket| async move {
debug!("OPEN app connection from {}", ip);
app_handler(socket, sessions, client_sockets, client_to_sessions).await;
debug!("CLOSE app connection from {}", ip);
})
}

pub async fn app_handler(
socket: WebSocket,
sessions: Sessions,
client_sockets: ClientSockets,
client_to_sessions: ClientToSessions,
) {
let (sender, mut receiver) = socket.split();
let connection_id = uuid7::uuid7();
let sessions = sessions.clone();

// Handle the new app connection here
// Wait for initialize message
let session_id = loop {
// If stream is closed, or message is not received, return
let msg = match receiver.next().await {
Some(Ok(msg)) => msg,
Some(Err(_)) | None => {
return;
}
};

let app_msg = match msg {
Message::Text(data) => match serde_json::from_str::<AppToServer>(&data) {
Ok(app_msg) => app_msg,
Err(_) => continue,
},
Message::Close(None) | Message::Close(Some(_)) => {
return;
}
_ => continue,
};

// We only accept initialize messages here
match app_msg {
AppToServer::InitializeRequest(init_data) => {
let session_id =
initialize_session_connection(&connection_id, sender, &sessions, init_data)
.await;

break session_id;
}
_ => {
continue;
}
}
};

// Main loop request handler
loop {
let msg = match receiver.next().await {
Some(Ok(msg)) => msg,
Some(Err(_)) | None => {
// Disconnect session
if let Err(err) = disconnect_session(
session_id.clone(),
connection_id,
&sessions,
&client_sockets,
&client_to_sessions,
)
.await
{
warn!("Error disconnecting session: {}", err);
}

return;
}
};

let app_msg = match msg {
Message::Text(data) => match serde_json::from_str::<AppToServer>(&data) {
Ok(app_msg) => app_msg,
Err(_) => continue,
},
Message::Close(None) | Message::Close(Some(_)) => {
// Disconnect session
if let Err(err) = disconnect_session(
session_id.clone(),
connection_id,
&sessions,
&client_sockets,
&client_to_sessions,
)
.await
{
warn!("Error disconnecting session: {}", err);
}
return;
}
_ => continue,
};

match app_msg {
AppToServer::RequestPayload(sing_transactions_request) => {
let mut sessions = sessions.write().await;
let session = match sessions.get_mut(&session_id) {
Some(session) => session,
None => {
// Should never happen
return;
}
};
let response_id: String = sing_transactions_request.response_id.clone();

session.pending_requests.insert(
response_id.clone(),
PendingRequest {
content: sing_transactions_request.content.clone(),
request_id: sing_transactions_request.response_id.clone(),
},
);
// Response will be sent by the client side
let sign_transactions_event = ServerToClient::NewPayloadEvent(NewPayloadEvent {
payload: sing_transactions_request.content.clone(),
request_id: response_id.clone(),
session_id: session_id.clone(),
});

let client_id = match &session.client_state.client_id {
Some(id) => id,
None => {
// Should never happen
warn!("No client_id found for session: {}", session_id);
continue;
}
};

// Try to send via WS, this can fail as user might not be subscribed to the session via Ws
if let Err(_) = client_sockets
.send_to_client(client_id.clone(), sign_transactions_event)
.await
{
// Fall back to notification
if let Some(notification) = &session.notification {
let notification_payload = NotificationPayload {
network: session.network.clone(),
app_metadata: session.app_state.metadata.clone(),
device: session
.client_state
.device
.clone()
.unwrap_or(Device::Unknown),
request: sing_transactions_request.content.clone(),
request_id: response_id.clone(),
session_id: session_id.clone(),
token: notification.token.clone(),
};

// This will never fail as we are not waiting for a response
trigger_notification(
notification.notification_endpoint.clone(),
notification_payload,
)
.await
.unwrap_or_default();
}
}
}
AppToServer::InitializeRequest(_) => {
// App should not send initialize message after the first one
continue;
}
}
}
}
71 changes: 71 additions & 0 deletions server/src/ws/app_handler/methods/disconnect_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::{
state::{ClientSockets, ClientToSessions, ModifySession, SendToClient, Sessions},
structs::{
client_messages::{
app_disconnected_event::AppDisconnectedEvent, client_messages::ServerToClient,
},
common::SessionStatus,
},
};
use anyhow::{bail, Result};
use log::warn;
use uuid7::Uuid;

pub async fn disconnect_session(
session_id: String,
connection_id: Uuid,
sessions: &Sessions,
client_sockets: &ClientSockets,
client_to_sessions: &ClientToSessions,
) -> Result<()> {
// Lock the whole sessions map as we might need to remove a session
let mut sessions = sessions.write().await;
let session = match sessions.get_mut(&session_id) {
Some(session) => session,
None => {
// Should never happen
bail!("Session not found, session_id: {}", session_id);
}
};

// Close user socket
if let Some(client_id) = &session.client_state.client_id {
let app_disconnected_event = ServerToClient::AppDisconnectedEvent(AppDisconnectedEvent {
session_id: session_id.clone(),
reason: "App disconnected".to_string(),
});

if let Err(err) = client_sockets
.send_to_client(client_id.clone(), app_disconnected_event)
.await
{
warn!(
"Error sending app disconnected event to client: {}, session_id: {}, err: {}",
client_id, session_id, err
);
}
}

// Close app socket
if let Err(err) = session.close_app_socket(&connection_id).await {
warn!(
"Error sending app disconnected event to connection_id: {}, session_id: {}, err: {}",
connection_id, session_id, err
);
}

// Update session status based on session type
if session.persistent {
session.update_status(SessionStatus::AppDisconnected);
} else {
// Remove session
if let Some(client_id) = session.client_state.client_id.clone() {
client_to_sessions
.remove_session(client_id, session_id.clone())
.await;
}
sessions.remove(&session_id);
}

Ok(())
}
Loading

0 comments on commit 491ed97

Please sign in to comment.