Skip to content

Commit

Permalink
Merge pull request #87 from nightly-labs/refactor
Browse files Browse the repository at this point in the history
"Refactor" http requests
  • Loading branch information
Giems authored Feb 7, 2024
2 parents 71d0afc + 7d43279 commit 2834f7e
Show file tree
Hide file tree
Showing 24 changed files with 476 additions and 394 deletions.
7 changes: 0 additions & 7 deletions server/src/client/get_wallets_metadata.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use axum::{extract::State, http::StatusCode, Json};
use serde::{Deserialize, Serialize};
use ts_rs::TS;

use crate::{
errors::NightlyError,
state::{ClientToSessions, ModifySession, Sessions},
structs::{
app_messages::{app_messages::ServerToApp, user_connected_event::UserConnectedEvent},
common::{Device, Notification, SessionStatus},
},
structs::common::{Device, Notification},
};
use axum::{extract::State, http::StatusCode, Json};
use serde::{Deserialize, Serialize};
use ts_rs::TS;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TS)]
#[ts(export)]
Expand Down Expand Up @@ -45,32 +41,28 @@ pub async fn connect_session(
))
}
};

// Insert user socket
session.update_status(SessionStatus::ClientConnected);
session.client_state.device = request.device.clone();
session.client_state.connected_public_keys = request.public_keys.clone();
session.client_state.metadata = request.metadata.clone();
session.client_state.client_id = Some(request.client_id.clone());
// notification
if let Some(notification) = request.notification.clone() {
session.notification = Some(notification);
}
let app_event = ServerToApp::UserConnectedEvent(UserConnectedEvent {
public_keys: request.public_keys,
metadata: request.metadata,
});
match session.send_to_app(app_event).await {
Ok(_) => {}
Err(_) => {
return Err((
session
.connect_user(
&request.device,
&request.public_keys,
&request.metadata,
&request.client_id,
&request.notification,
)
.await
.map_err(|_| {
return (
StatusCode::BAD_REQUEST,
NightlyError::AppDisconnected.to_string(),
))
}
};
);
})?;

// Insert new session id into client_to_sessions
client_to_sessions
.add_session(request.client_id.clone(), request.session_id.clone())
.await;

return Ok(Json(HttpConnectSessionResponse {}));
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn drop_sessions(
Json(request): Json<HttpDropSessionsRequest>,
) -> Result<Json<HttpDropSessionsResponse>, (StatusCode, String)> {
let mut dropped_sessions = Vec::new();
// TODO handle disconnecting app

for session_id in request.sessions {
if sessions.disconnect_user(session_id.clone()).await.is_ok() {
dropped_sessions.push(session_id.clone());
Expand All @@ -36,5 +36,6 @@ pub async fn drop_sessions(
.remove_session(request.client_id.clone(), session_id)
.await;
}

Ok(Json(HttpDropSessionsResponse { dropped_sessions }))
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,25 @@ pub async fn get_pending_request(
))
}
};

if session.client_state.client_id != Some(request.client_id.clone()) {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::UserNotConnected.to_string(),
));
}
let pending_request = match session.pending_requests.get(&request.request_id) {
Some(pending_request) => pending_request,

match session.pending_requests.get(&request.request_id) {
Some(pending_request) => {
return Ok(Json(HttpGetPendingRequestResponse {
request: pending_request.clone(),
}))
}
None => {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::RequestDoesNotExist.to_string(),
))
}
};

Ok(Json(HttpGetPendingRequestResponse {
request: pending_request.clone(),
}))
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn get_pending_requests(
))
}
};

if session.client_state.client_id != Some(request.client_id.clone()) {
return Err((
StatusCode::BAD_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub async fn get_session_info(
))
}
};

let response = HttpGetSessionInfoResponse {
status: session.status.clone(),
persistent: session.persistent,
Expand Down
File renamed without changes.
9 changes: 9 additions & 0 deletions server/src/http/get_wallets_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::structs::wallet_metadata::WalletMetadata;
use axum::{extract::State, http::StatusCode, Json};
use std::{ops::Deref, sync::Arc};

pub async fn get_wallets_metadata(
State(wallets_metadata): State<Arc<Vec<WalletMetadata>>>,
) -> Result<Json<Vec<WalletMetadata>>, (StatusCode, String)> {
Ok(Json(wallets_metadata.deref().clone()))
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub async fn resolve_request(
State(sessions): State<Sessions>,
Json(request): Json<HttpResolveRequestRequest>,
) -> Result<Json<HttpResolveRequestResponse>, (StatusCode, String)> {
// Get session
let mut sessions = sessions.write().await;

let session = match sessions.get_mut(&request.session_id) {
Some(session) => session,
None => {
Expand All @@ -37,36 +37,33 @@ pub async fn resolve_request(
))
}
};
// Check if client_id matches

// Check if client_id matches
if session.client_state.client_id != Some(request.client_id.clone()) {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::UserNotConnected.to_string(),
));
}
let _pending_request = match session.pending_requests.remove(&request.request_id) {
Some(pending_request) => pending_request,
None => {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::RequestDoesNotExist.to_string(),
))
}
// Remove request from pending requests
if let None = session.pending_requests.remove(&request.request_id) {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::RequestDoesNotExist.to_string(),
));
};

// Send to app
let app_msg = ServerToApp::ResponsePayload(ResponsePayload {
response_id: request.request_id.clone(),
content: request.content.clone(),
});
match session.send_to_app(app_msg).await {
Ok(_) => {}
Err(_) => {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::AppDisconnected.to_string(),
))
}
if let Err(_) = session.send_to_app(app_msg).await {
return Err((
StatusCode::BAD_REQUEST,
NightlyError::AppDisconnected.to_string(),
));
};

return Ok(Json(HttpResolveRequestResponse {}));
}
3 changes: 1 addition & 2 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
pub mod client;
pub mod errors;
pub mod handle_error;
pub mod http;
pub mod router;
mod sesssion_cleaner;
pub mod state;
pub mod structs;
pub mod utils;
pub mod wallets;
pub mod ws;
26 changes: 13 additions & 13 deletions server/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
use std::time::Duration;

use axum::{
error_handling::HandleErrorLayer,
routing::{get, post},
Router,
};
use tower::ServiceBuilder;
use tracing_subscriber::EnvFilter;

use crate::{
client::{
handle_error::handle_error,
http::{
connect_session::connect_session, drop_sessions::drop_sessions,
get_pending_request::get_pending_request, get_pending_requests::get_pending_requests,
get_session_info::get_session_info, get_sessions::get_sessions,
get_wallets_metadata::get_wallets_metadata, resolve_request::resolve_request,
},
handle_error::handle_error,
sesssion_cleaner::start_cleaning_sessions,
state::ServerState,
structs::http_endpoints::HttpEndpoint,
utils::get_cors,
utils::{get_cors, get_wallets_metadata_vec},
ws::{
app_handler::handler::on_new_app_connection,
client_handler::handler::on_new_client_connection,
},
};
use axum::{
error_handling::HandleErrorLayer,
routing::{get, post},
Router,
};
use std::{sync::Arc, time::Duration};
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tracing_subscriber::EnvFilter;

pub async fn get_router() -> Router {
let state = ServerState {
sessions: Default::default(),
client_to_sessions: Default::default(),
client_to_sockets: Default::default(),
wallets_metadata: Arc::new(get_wallets_metadata_vec()),
};
// Start cleaning outdated sessions
start_cleaning_sessions(state.sessions.clone(), state.client_to_sessions.clone());
Expand Down
22 changes: 14 additions & 8 deletions server/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::structs::{client_messages::client_messages::ServerToClient, session::Session};
use crate::structs::{
client_messages::client_messages::ServerToClient, session::Session,
wallet_metadata::WalletMetadata,
};
use anyhow::Result;
use async_trait::async_trait;
use axum::extract::{
Expand All @@ -17,6 +20,16 @@ pub type SessionId = String;
pub type ClientId = String;
pub type Sessions = Arc<RwLock<HashMap<SessionId, Session>>>;
pub type ClientSockets = Arc<RwLock<HashMap<ClientId, RwLock<SplitSink<WebSocket, Message>>>>>;
pub type ClientToSessions = Arc<RwLock<HashMap<ClientId, RwLock<HashSet<SessionId>>>>>;

#[derive(Clone, FromRef)]
pub struct ServerState {
pub sessions: Sessions,
pub client_to_sockets: ClientSockets, // Holds only live sockets
pub client_to_sessions: ClientToSessions,
pub wallets_metadata: Arc<Vec<WalletMetadata>>,
}

#[async_trait]
pub trait DisconnectUser {
async fn disconnect_user(&self, session_id: SessionId) -> Result<()>;
Expand Down Expand Up @@ -69,13 +82,6 @@ impl SendToClient for ClientSockets {
}
}
}
pub type ClientToSessions = Arc<RwLock<HashMap<ClientId, RwLock<HashSet<SessionId>>>>>;
#[derive(Clone, FromRef)]
pub struct ServerState {
pub sessions: Sessions,
pub client_to_sockets: ClientSockets, // Holds only live sockets
pub client_to_sessions: ClientToSessions,
}

#[async_trait]
pub trait ModifySession {
Expand Down
1 change: 1 addition & 0 deletions server/src/structs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub mod notification_msg;
pub mod session;
pub mod wallet_metadata;
pub mod wallet_type;
pub mod wallets;
38 changes: 23 additions & 15 deletions server/src/structs/session.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use std::collections::HashMap;

use crate::{state::ClientId, utils::get_timestamp_in_milliseconds};

use super::{
app_messages::{
app_messages::ServerToApp, initialize::InitializeRequest,
user_connected_event::UserConnectedEvent, user_disconnected_event::UserDisconnectedEvent,
},
client_messages::connect::ConnectRequest,
common::{AppMetadata, Device, Network, Notification, PendingRequest, SessionStatus, Version},
};
use anyhow::Result;
use crate::{state::ClientId, utils::get_timestamp_in_milliseconds};
use anyhow::{bail, Result};
use axum::extract::ws::{Message, WebSocket};
use futures::{stream::SplitSink, SinkExt};
use log::{info, warn};
use std::collections::HashMap;
use uuid7::Uuid;

#[derive(Debug)]
Expand Down Expand Up @@ -114,26 +111,37 @@ impl Session {
}
}

pub async fn connect_user(&mut self, connect_request: &ConnectRequest) {
pub async fn connect_user(
&mut self,
device: &Option<Device>,
public_keys: &Vec<String>,
metadata: &Option<String>,
client_id: &String,
notification: &Option<Notification>,
) -> Result<()> {
// Update session status
self.update_status(SessionStatus::ClientConnected);

// Update client state
self.client_state.device = connect_request.device.clone();
self.client_state.connected_public_keys = connect_request.public_keys.clone();
self.client_state.metadata = connect_request.metadata.clone();
self.client_state.client_id = Some(connect_request.client_id.clone());
self.client_state.device = device.clone();
self.client_state.connected_public_keys = public_keys.clone();
self.client_state.metadata = metadata.clone();
self.client_state.client_id = Some(client_id.clone());

if let Some(notification) = &connect_request.notification {
if let Some(notification) = notification {
self.notification = Some(notification.clone());
}

// Send user connected event to app
let app_event = ServerToApp::UserConnectedEvent(UserConnectedEvent {
public_keys: connect_request.public_keys.clone(),
metadata: connect_request.metadata.clone(),
public_keys: public_keys.clone(),
metadata: metadata.clone(),
});
self.send_to_app(app_event).await.unwrap_or_default();

match self.send_to_app(app_event).await {
Ok(_) => Ok(()),
Err(err) => bail!("Failed to send message to app: {:?}", err),
}
}

pub async fn disconnect_user(&mut self) {
Expand Down
Loading

0 comments on commit 2834f7e

Please sign in to comment.