Skip to content

Commit

Permalink
wip: use async unsend::RwLock for clients list
Browse files Browse the repository at this point in the history
  • Loading branch information
gwen-lg committed Jan 18, 2025
1 parent 70956fd commit fadd9f4
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 12 deletions.
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,6 @@ zero_sized_map_values = "warn"

# eframe = { path = "../../egui/crates/eframe" }
# egui = { path = "../../egui/crates/egui" }

# use local async-lock as dependency of async-std for dev-dependencies, instead of published one
async-lock = { path = "../_crates/async-lock/" }
1 change: 1 addition & 0 deletions puffin_http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ puffin = { version = "0.19.1", path = "../puffin", features = [
"lz4",
"serialization",
] }
unsend = { version = "0.2.1", default-features = false }
#smol = "2.0"

[dev-dependencies]
Expand Down
21 changes: 11 additions & 10 deletions puffin_http/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::Context as _;
use async_executor::{LocalExecutor, Task};
use async_net::{SocketAddr, TcpListener, TcpStream};

use futures_lite::{future, AsyncWriteExt};
use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection};
use std::{
cell::RefCell,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use unsend::lock::RwLock;

/// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough.
const MAX_FRAMES_IN_QUEUE: usize = 30;
Expand Down Expand Up @@ -267,7 +268,7 @@ impl Server {
) -> anyhow::Result<()> {
let executor = Rc::new(LocalExecutor::new());

let clients = Rc::new(RefCell::new(Vec::new()));
let clients = Rc::new(RwLock::new(Vec::new()));
let clients_cloned = clients.clone();
let num_clients_cloned = num_clients.clone();

Expand Down Expand Up @@ -304,6 +305,7 @@ impl Server {
log::warn!("puffin server failure: {}", err);
}
}
log::trace!("End to Wait frame to send");
});
//.context("Couldn't spawn ps-send task")?;

Expand Down Expand Up @@ -358,7 +360,7 @@ impl Drop for Client {
struct PuffinServerConnection<'a> {
executor: Rc<LocalExecutor<'a>>,
tcp_listener: TcpListener,
clients: Rc<RefCell<Vec<Client>>>,
clients: Rc<RwLock<Vec<Client>>>,
num_clients: Arc<AtomicUsize>,
}

Expand All @@ -382,14 +384,14 @@ impl<'a> PuffinServerConnection<'a> {
// Send all scopes when new client connects.
// TODO: send all previous scopes at connection, not on regular send
//self.send_all_scopes = true;
self.clients.borrow_mut().push(Client {
self.clients.write().await.push(Client {
client_addr,
packet_tx: Some(packet_tx),
join_handle: Some(join_handle),
send_all_scopes: true,
});
self.num_clients
.store(self.clients.borrow().len(), Ordering::SeqCst);
.store(self.clients.read().await.len(), Ordering::SeqCst);
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
break; // Nothing to do for now.
Expand All @@ -405,14 +407,14 @@ impl<'a> PuffinServerConnection<'a> {

/// streams to client puffin profiler data.
struct PuffinServerSend {
clients: Rc<RefCell<Vec<Client>>>,
clients: Rc<RwLock<Vec<Client>>>,
num_clients: Arc<AtomicUsize>,
scope_collection: ScopeCollection,
}

impl PuffinServerSend {
pub async fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> {
if self.clients.borrow().is_empty() {
if self.clients.read().await.is_empty() {
return Ok(());
}
puffin::profile_function!();
Expand Down Expand Up @@ -443,9 +445,8 @@ impl PuffinServerSend {
let packet_all_scopes: Packet = packet_all_scopes.into();

// Send frame to clients, remove disconnected clients and update num_clients var
let clients = self.clients.borrow();
let mut idx_to_remove = Vec::new();
for (idx, client) in clients.iter().enumerate() {
for (idx, client) in self.clients.read().await.iter().enumerate() {
let packet = if client.send_all_scopes {
packet_all_scopes.clone()
} else {
Expand All @@ -456,7 +457,7 @@ impl PuffinServerSend {
}
}

let mut clients = self.clients.borrow_mut();
let mut clients = self.clients.write().await;
idx_to_remove.iter().rev().for_each(|idx| {
clients.remove(*idx);
});
Expand Down

0 comments on commit fadd9f4

Please sign in to comment.