Skip to content

Commit

Permalink
refactor: replace queue lock with tokio mutex
Browse files Browse the repository at this point in the history
Tokio mutex guarantees FIFO ordering based on the documentation so it can be used instead of the custom queue lock
  • Loading branch information
jacobtread committed Oct 13, 2024
1 parent 55f2418 commit 77f3cd2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 141 deletions.
37 changes: 22 additions & 15 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
session::models::{NetworkAddress, QosNetworkData},
utils::{
components::{component_key, user_sessions, DEBUG_IGNORED_PACKETS},
lock::{QueueLock, QueueLockGuard, TicketAcquireFuture},
types::{GameID, PlayerID},
},
};
Expand All @@ -44,7 +43,7 @@ use std::{
task::{ready, Context, Poll},
};
use std::{future::Future, sync::Weak};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, OwnedMutexGuard};
use tokio_util::codec::Framed;

pub mod models;
Expand All @@ -67,8 +66,8 @@ pub struct Session {
pub association: Option<AssociationId>,

/// Lock for handling packets with a session, ensures only one packet is
/// processed at a time and in the same order that it was received
busy_lock: QueueLock,
/// processed at a time and in the same order that it was received / sent
busy_lock: Arc<tokio::sync::Mutex<()>>,

/// Sender for sending packets to the session
tx: mpsc::UnboundedSender<Packet>,
Expand All @@ -82,16 +81,18 @@ pub struct Session {

#[derive(Clone)]
pub struct SessionNotifyHandle {
busy_lock: QueueLock,
busy_lock: Arc<tokio::sync::Mutex<()>>,
tx: mpsc::UnboundedSender<Packet>,
}

impl SessionNotifyHandle {
/// Pushes a new notification packet, this will acquire a queue position
/// waiting until the current response is handled before sending
/// Pushes a new notification packet
pub fn notify(&self, packet: Packet) {
let tx = self.tx.clone();
let busy_lock = self.busy_lock.acquire();

// Acquire the lock position before scheduling the task to ensure correct ordering
let busy_lock = self.busy_lock.clone().lock_owned();

tokio::spawn(async move {
let _guard = busy_lock.await;
let _ = tx.send(packet);
Expand Down Expand Up @@ -243,7 +244,7 @@ impl Session {
let session = Arc::new(Self {
id,
association,
busy_lock: QueueLock::new(),
busy_lock: Default::default(),
tx,
data: Default::default(),
addr,
Expand Down Expand Up @@ -526,14 +527,14 @@ enum ReadState<'a> {
/// Acquiring a lock guard
Acquire {
/// Future for the locking guard
ticket: TicketAcquireFuture,
lock_future: BoxFuture<'static, OwnedMutexGuard<()>>,
/// The packet that was read
packet: Option<Packet>,
},
/// Future for a handler is being polled
Handle {
/// Locking guard
guard: QueueLockGuard,
guard: OwnedMutexGuard<()>,
/// Handle future
future: BoxFuture<'a, Packet>,
},
Expand Down Expand Up @@ -601,18 +602,24 @@ impl SessionFuture<'_> {
let result = ready!(Pin::new(&mut self.io).poll_next(cx));

if let Some(Ok(packet)) = result {
let ticket = self.session.busy_lock.acquire();
let lock_future = self.session.busy_lock.clone().lock_owned();
let lock_future: BoxFuture<'static, OwnedMutexGuard<()>> =
Box::pin(lock_future);

self.read_state = ReadState::Acquire {
ticket,
lock_future,
packet: Some(packet),
}
} else {
// Reader has closed or reading encountered an error (Either way stop reading)
self.stop = true;
}
}
ReadState::Acquire { ticket, packet } => {
let guard = ready!(Pin::new(ticket).poll(cx));
ReadState::Acquire {
lock_future,
packet,
} => {
let guard = ready!(Pin::new(lock_future).poll(cx));
let packet = packet
.take()
.expect("Unexpected acquire state without packet");
Expand Down
125 changes: 0 additions & 125 deletions src/utils/lock.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod components;
pub mod encoding;
pub mod hashing;
pub mod lock;
pub mod logging;
pub mod parsing;
pub mod random_name;
Expand Down

0 comments on commit 77f3cd2

Please sign in to comment.