From 77f3cd2bd4d5228daaf25776bd9982cd052786a1 Mon Sep 17 00:00:00 2001 From: Jacobtread Date: Sun, 13 Oct 2024 17:37:10 +1300 Subject: [PATCH] refactor: replace queue lock with tokio mutex Tokio mutex guarantees FIFO ordering based on the documentation so it can be used instead of the custom queue lock --- src/session/mod.rs | 37 ++++++++------ src/utils/lock.rs | 125 --------------------------------------------- src/utils/mod.rs | 1 - 3 files changed, 22 insertions(+), 141 deletions(-) delete mode 100644 src/utils/lock.rs diff --git a/src/session/mod.rs b/src/session/mod.rs index 18f13f7..03321b0 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -23,7 +23,6 @@ use crate::{ session::models::{NetworkAddress, QosNetworkData}, utils::{ components::{component_key, user_sessions, DEBUG_IGNORED_PACKETS}, - lock::{QueueLock, QueueLockGuard, TicketAcquireFuture}, types::{GameID, PlayerID}, }, }; @@ -44,7 +43,7 @@ use std::{ task::{ready, Context, Poll}, }; use std::{future::Future, sync::Weak}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, OwnedMutexGuard}; use tokio_util::codec::Framed; pub mod models; @@ -67,8 +66,8 @@ pub struct Session { pub association: Option, /// Lock for handling packets with a session, ensures only one packet is - /// processed at a time and in the same order that it was received - busy_lock: QueueLock, + /// processed at a time and in the same order that it was received / sent + busy_lock: Arc>, /// Sender for sending packets to the session tx: mpsc::UnboundedSender, @@ -82,16 +81,18 @@ pub struct Session { #[derive(Clone)] pub struct SessionNotifyHandle { - busy_lock: QueueLock, + busy_lock: Arc>, tx: mpsc::UnboundedSender, } impl SessionNotifyHandle { - /// Pushes a new notification packet, this will acquire a queue position - /// waiting until the current response is handled before sending + /// Pushes a new notification packet pub fn notify(&self, packet: Packet) { let tx = self.tx.clone(); - let busy_lock = self.busy_lock.acquire(); + + // Acquire the lock position before scheduling the task to ensure correct ordering + let busy_lock = self.busy_lock.clone().lock_owned(); + tokio::spawn(async move { let _guard = busy_lock.await; let _ = tx.send(packet); @@ -243,7 +244,7 @@ impl Session { let session = Arc::new(Self { id, association, - busy_lock: QueueLock::new(), + busy_lock: Default::default(), tx, data: Default::default(), addr, @@ -526,14 +527,14 @@ enum ReadState<'a> { /// Acquiring a lock guard Acquire { /// Future for the locking guard - ticket: TicketAcquireFuture, + lock_future: BoxFuture<'static, OwnedMutexGuard<()>>, /// The packet that was read packet: Option, }, /// Future for a handler is being polled Handle { /// Locking guard - guard: QueueLockGuard, + guard: OwnedMutexGuard<()>, /// Handle future future: BoxFuture<'a, Packet>, }, @@ -601,9 +602,12 @@ impl SessionFuture<'_> { let result = ready!(Pin::new(&mut self.io).poll_next(cx)); if let Some(Ok(packet)) = result { - let ticket = self.session.busy_lock.acquire(); + let lock_future = self.session.busy_lock.clone().lock_owned(); + let lock_future: BoxFuture<'static, OwnedMutexGuard<()>> = + Box::pin(lock_future); + self.read_state = ReadState::Acquire { - ticket, + lock_future, packet: Some(packet), } } else { @@ -611,8 +615,11 @@ impl SessionFuture<'_> { self.stop = true; } } - ReadState::Acquire { ticket, packet } => { - let guard = ready!(Pin::new(ticket).poll(cx)); + ReadState::Acquire { + lock_future, + packet, + } => { + let guard = ready!(Pin::new(lock_future).poll(cx)); let packet = packet .take() .expect("Unexpected acquire state without packet"); diff --git a/src/utils/lock.rs b/src/utils/lock.rs deleted file mode 100644 index 5464548..0000000 --- a/src/utils/lock.rs +++ /dev/null @@ -1,125 +0,0 @@ -use log::warn; -use std::future::Future; -use std::pin::Pin; -use std::sync::{atomic::AtomicUsize, Arc}; -use std::task::{ready, Context, Poll}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tokio_util::sync::PollSemaphore; - -/// Lock with strict ordering for permits, maintains strict -/// FIFO ordering -#[derive(Clone)] -pub struct QueueLock { - inner: Arc, -} - -impl QueueLock { - pub fn new() -> QueueLock { - let inner = QueueLockInner { - semaphore: Arc::new(Semaphore::new(1)), - next_ticket: AtomicUsize::new(1), - current_ticket: AtomicUsize::new(1), - }; - - QueueLock { - inner: Arc::new(inner), - } - } - - /// Acquire a ticket for the queue, returns a future - /// which completes when its the tickets turn to access - pub fn acquire(&self) -> TicketAcquireFuture { - let ticket = self - .inner - .next_ticket - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); - let poll = PollSemaphore::new(self.inner.semaphore.clone()); - - TicketAcquireFuture { - inner: self.inner.clone(), - poll, - ticket, - } - } -} - -struct QueueLockInner { - /// Underlying async acquisition primitive - semaphore: Arc, - /// The next ticket to provide access to - next_ticket: AtomicUsize, - /// The current ticket allowed access - current_ticket: AtomicUsize, -} - -/// Future while waiting to acquire its lock -/// -/// TODO: If these futures are dropped early then -/// the lock wont be able to unlock, figure out how -/// to fix this..? -pub struct TicketAcquireFuture { - /// The queue lock being waited on - inner: Arc, - /// Semaphore that can be polled - poll: PollSemaphore, - /// The ticket for this queue position - ticket: usize, -} - -impl Drop for TicketAcquireFuture { - fn drop(&mut self) { - let current = self - .inner - .current_ticket - .load(std::sync::atomic::Ordering::SeqCst); - - // Ensure we are the ticket that is allowed - if current != self.ticket { - warn!("Early dropped ticket acquire {}", self.ticket); - } - } -} - -/// Guard which releases the queue lock when dropped -pub struct QueueLockGuard { - /// Acquisition permit - _permit: OwnedSemaphorePermit, - inner: Arc, -} - -impl Future for TicketAcquireFuture { - type Output = QueueLockGuard; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - let permit = ready!(this.poll.poll_acquire(cx)).expect("Queue task semaphore was closed"); - - let current = this - .inner - .current_ticket - .load(std::sync::atomic::Ordering::SeqCst); - - // Ensure we are the ticket that is allowed - if current == this.ticket { - Poll::Ready(QueueLockGuard { - _permit: permit, - inner: this.inner.clone(), - }) - } else { - // Make sure this future is polled again when possible - // TODO: Is this okay to do?? (Tokio defers their version but thats internal crate access) - cx.waker().wake_by_ref(); - - Poll::Pending - } - } -} - -impl Drop for QueueLockGuard { - fn drop(&mut self) { - // Set the current ticket to the next ticket - self.inner - .current_ticket - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); - } -} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 191c985..cda760b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,7 +1,6 @@ pub mod components; pub mod encoding; pub mod hashing; -pub mod lock; pub mod logging; pub mod parsing; pub mod random_name;