Skip to content

Commit

Permalink
Merge pull request #84 from nightly-labs/dashmap-removal-part-2
Browse files Browse the repository at this point in the history
replace remaining dashmap
  • Loading branch information
Giems authored Feb 5, 2024
2 parents 5b560cd + 629e0ac commit 2785be7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 40 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ resolver = "2"
members = ["server"]
[workspace.dependencies]

dashmap = { version = "5.4.0", features = ["serde"] }
ctrlc = "3.4.2"
serde = { version = "1.0.196", features = ["derive"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
2 changes: 1 addition & 1 deletion sdk/commonTestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const smartDelay = async (ms?: number) => {
if (process.env.IS_CI) {
await sleep(ms || 100)
} else {
await sleep(ms || 5)
await sleep(ms || 50)
}
}
}
1 change: 0 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ edition = "2021"
[dependencies]
serde = { workspace = true }
tokio = { workspace = true }
dashmap = { workspace = true }
serde_json = { workspace = true }
ts-rs = { workspace = true }
futures = { workspace = true }
Expand Down
49 changes: 26 additions & 23 deletions server/src/app/app_handler.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
use std::{collections::HashMap, net::SocketAddr};

use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::Response,
};
use futures::{SinkExt, StreamExt};
use log::{debug, warn};

use crate::{
state::{ClientSockets, ClientToSessions, ModifySession, SendToClient, Sessions},
structs::{
app_messages::{
already_connected::AlreadyConnected,
app_messages::{AppToServer, ServerToApp},
initialize::InitializeResponse,
},
Expand All @@ -28,6 +15,16 @@ use crate::{
},
utils::get_timestamp_in_milliseconds,
};
use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::Response,
};
use futures::StreamExt;
use log::debug;
use std::{collections::HashMap, net::SocketAddr};

pub async fn on_new_app_connection(
ConnectInfo(ip): ConnectInfo<SocketAddr>,
Expand All @@ -50,7 +47,7 @@ pub async fn app_handler(
client_sockets: ClientSockets,
client_to_sessions: ClientToSessions,
) {
let (mut sender, mut receiver) = socket.split();
let (sender, mut receiver) = socket.split();
let connection_id = uuid7::uuid7();
// Handle the new app connection here
// Wait for initialize message
Expand Down Expand Up @@ -90,7 +87,7 @@ pub async fn app_handler(
let (session_id, created_new) = {
let mut sessions = sessions.write().await;
match sessions.get_mut(session_id.as_str()) {
Some(mut session) => {
Some(session) => {
session.update_status(SessionStatus::AppConnected);
session
.app_state
Expand Down Expand Up @@ -225,10 +222,13 @@ pub async fn app_handler(
}
false => {
// Remove session
client_to_sessions.remove_session(
session.client_state.client_id.clone().unwrap_or_default(),
session_id.clone(),
);
client_to_sessions
.remove_session(
session.client_state.client_id.clone().unwrap_or_default(),
session_id.clone(),
)
.await;

drop(session);
sessions.remove(&session_id);
}
Expand Down Expand Up @@ -270,10 +270,13 @@ pub async fn app_handler(
}
false => {
// Remove session
client_to_sessions.remove_session(
session.client_state.client_id.clone().unwrap_or_default(),
session_id.clone(),
);
client_to_sessions
.remove_session(
session.client_state.client_id.clone().unwrap_or_default(),
session_id.clone(),
)
.await;

drop(session);
sessions.remove(&session_id);
}
Expand Down
8 changes: 7 additions & 1 deletion server/src/client/client_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use axum::{
use futures::StreamExt;
use log::{debug, info};
use std::net::SocketAddr;
use tokio::sync::RwLock;

pub async fn on_new_client_connection(
ConnectInfo(ip): ConnectInfo<SocketAddr>,
Expand Down Expand Up @@ -86,7 +87,12 @@ pub async fn client_handler(
};
match app_msg {
ClientToServer::ClientInitializeRequest(connect_request) => {
client_sockets.insert(connect_request.client_id.clone(), sender);
// Insert client socket
{
let mut client_sockets_write = client_sockets.write().await;
client_sockets_write
.insert(connect_request.client_id.clone(), RwLock::new(sender));
}
// Send response
let client_msg =
ServerToClient::ClientInitializeResponse(ClientInitializeResponse {
Expand Down
23 changes: 10 additions & 13 deletions server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use axum::extract::{
ws::{Message, WebSocket},
FromRef,
};
use dashmap::DashMap;
use futures::{stream::SplitSink, SinkExt};
use log::info;
use std::{
Expand All @@ -21,7 +20,7 @@ use tokio::sync::RwLock;
pub type SessionId = String;
pub type ClientId = String;
pub type Sessions = Arc<RwLock<HashMap<SessionId, Session>>>;
pub type ClientSockets = Arc<DashMap<ClientId, SplitSink<WebSocket, Message>>>;
pub type ClientSockets = Arc<RwLock<HashMap<ClientId, RwLock<SplitSink<WebSocket, Message>>>>>;
#[async_trait]
pub trait DisconnectUser {
async fn disconnect_user(&self, session_id: SessionId) -> Result<()>;
Expand Down Expand Up @@ -53,13 +52,16 @@ pub trait SendToClient {
async fn send_to_client(&self, client_id: ClientId, msg: ServerToClient) -> Result<()>;
async fn close_client_socket(&self, client_id: ClientId) -> Result<()>;
}

#[async_trait]
impl SendToClient for ClientSockets {
async fn send_to_client(&self, client_id: ClientId, msg: ServerToClient) -> Result<()> {
match &mut self.get_mut(&client_id) {
match self.read().await.get(&client_id) {
Some(client_socket) => {
info!("Send to client {}, msg: {:?}", client_id, msg);
return Ok(client_socket
.write()
.await
.send(Message::Text(
serde_json::to_string(&msg).expect("Serialization should work"),
))
Expand All @@ -70,9 +72,9 @@ impl SendToClient for ClientSockets {
}
async fn close_client_socket(&self, client_id: ClientId) -> Result<()> {
info!("Close client socket {}", client_id);
match &mut self.remove(&client_id) {
Some((_, client_socket)) => {
return Ok(client_socket.close().await?);
match self.write().await.remove(&client_id) {
Some(client_socket) => {
return Ok(client_socket.write().await.close().await?);
}
None => Err(anyhow::anyhow!("No client socket found for session")),
}
Expand Down Expand Up @@ -124,13 +126,8 @@ impl ModifySession for ClientToSessions {
}

async fn get_sessions(&self, client_id: ClientId) -> Vec<SessionId> {
let clients_read = self.read().await;
match clients_read.get(&client_id) {
Some(sessions) => {
let client_sessions = sessions.read().await;

client_sessions.iter().cloned().collect()
}
match self.read().await.get(&client_id) {
Some(sessions) => sessions.read().await.iter().cloned().collect(),
None => vec![],
}
}
Expand Down

0 comments on commit 2785be7

Please sign in to comment.