Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of notify_last method #6520

Merged
merged 19 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ struct Waiter {
/// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
waker: UnsafeCell<Option<Waker>>,

/// Notification for this waiter.
/// Notification for this waiter. Uses 2 bits to store if and how was
/// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
/// the rest of it is unused.
/// * if it's `None`, then `waker` is protected by the `waiters` lock.
/// * if it's `Some`, then `waker` is exclusively owned by the
/// enclosing `Waiter` and can be accessed without locking.
Expand Down Expand Up @@ -253,13 +255,16 @@ generate_addr_of_methods! {
}

// No notification.
const NOTIFICATION_NONE: usize = 0;
const NOTIFICATION_NONE: usize = 0b000;

// Notification type used by `notify_one`.
const NOTIFICATION_ONE: usize = 1;
const NOTIFICATION_ONE: usize = 0b001;

// Notification type used by `notify_last`.
const NOTIFICATION_LAST: usize = 0b101;

// Notification type used by `notify_waiters`.
const NOTIFICATION_ALL: usize = 2;
const NOTIFICATION_ALL: usize = 0b010;

/// Notification for a `Waiter`.
/// This struct is equivalent to `Option<Notification>`, but uses
Expand All @@ -275,13 +280,20 @@ impl AtomicNotification {
/// Store-release a notification.
/// This method should be called exactly once.
fn store_release(&self, notification: Notification) {
self.0.store(notification as usize, Release);
let data: usize = match notification {
Notification::All => NOTIFICATION_ALL,
Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
};
self.0.store(data, Release);
}

fn load(&self, ordering: Ordering) -> Option<Notification> {
match self.0.load(ordering) {
let data = self.0.load(ordering);
match data {
NOTIFICATION_NONE => None,
NOTIFICATION_ONE => Some(Notification::One),
NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
NOTIFICATION_ALL => Some(Notification::All),
_ => unreachable!(),
}
Expand All @@ -296,11 +308,18 @@ impl AtomicNotification {
}
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum NotifyOneStrategy {
Fifo,
Lifo,
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum Notification {
One = NOTIFICATION_ONE,
All = NOTIFICATION_ALL,
One(NotifyOneStrategy),
All,
}

/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
Expand Down Expand Up @@ -521,7 +540,7 @@ impl Notify {
}
}

/// Notifies a waiting task.
/// Notifies the first waiting task.
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
Expand Down Expand Up @@ -558,6 +577,23 @@ impl Notify {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "notify"))]
pub fn notify_one(&self) {
self.notify_with_strategy(NotifyOneStrategy::Fifo);
}

/// Notifies the last waiting task.
///
/// This function behaves similar to `notify_one`. The only difference is that it wakes
/// the most recently added waiter instead of the oldest waiter.
///
/// Check the [`notify_one()`] documentation for more info and
/// examples.
///
/// [`notify_one()`]: Notify::notify_one
pub fn notify_last(&self) {
self.notify_with_strategy(NotifyOneStrategy::Lifo);
}

fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
// Load the current state
let mut curr = self.state.load(SeqCst);

Expand Down Expand Up @@ -585,7 +621,7 @@ impl Notify {
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);

if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
drop(waiters);
waker.wake();
}
Expand Down Expand Up @@ -708,7 +744,12 @@ impl Default for Notify {
impl UnwindSafe for Notify {}
impl RefUnwindSafe for Notify {}

fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
fn notify_locked(
waiters: &mut WaitList,
state: &AtomicUsize,
curr: usize,
strategy: NotifyOneStrategy,
) -> Option<Waker> {
match get_state(curr) {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
Expand All @@ -728,8 +769,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// concurrently change as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get a pending waiter
let waiter = waiters.pop_back().unwrap();
// Get a pending waiter using one of the available dequeue strategies.
let waiter = match strategy {
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
};

// Safety: we never make mutable references to waiters.
let waiter = unsafe { waiter.as_ref() };
Expand All @@ -738,7 +782,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };

// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::One);
waiter
.notification
.store_release(Notification::One(strategy));

if waiters.is_empty() {
// As this the **final** waiter in the list, the state
Expand Down Expand Up @@ -1137,8 +1183,10 @@ impl Drop for Notified<'_> {
// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
if notification == Some(Notification::One) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
if let Some(Notification::One(strategy)) = notification {
if let Some(waker) =
notify_locked(&mut waiters, &notify.state, notify_state, strategy)
{
drop(waiters);
waker.wake();
}
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,26 @@ impl<L: Link> LinkedList<L, L::Target> {
}
}

/// Removes the first element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_front(&mut self) -> Option<L::Handle> {
unsafe {
let head = self.head?;
self.head = L::pointers(head).as_ref().get_next();

if let Some(new_head) = L::pointers(head).as_ref().get_next() {
L::pointers(new_head).as_mut().set_prev(None);
} else {
self.tail = None;
}

L::pointers(head).as_mut().set_prev(None);
L::pointers(head).as_mut().set_next(None);

Some(L::from_raw(head))
}
}

/// Removes the last element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
Expand Down
75 changes: 75 additions & 0 deletions tokio/tests/sync_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@ fn notify_notified_one() {
assert_ready!(notified.poll());
}

#[test]
fn notify_multi_notified_one() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the first one
notify.notify_one();
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_multi_notified_last() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the last one
notify.notify_last();
assert_pending!(notified1.poll());
assert_ready!(notified2.poll());
}

#[test]
fn notified_one_notify() {
let notify = Notify::new();
Expand Down Expand Up @@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() {
assert_ready!(notified2.poll());
}

#[test]
fn notified_multi_notify_one_drop() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

// by default fifo
notify.notify_one();

drop(notified1);

// next waiter should be the one to be to woken up
assert_ready!(notified2.poll());
assert_pending!(notified3.poll());
}

#[test]
fn notified_multi_notify_last_drop() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

notify.notify_last();

drop(notified3);

// latest waiter added should be the one to woken up
assert_ready!(notified2.poll());
assert_pending!(notified1.poll());
}

#[test]
fn notify_in_drop_after_wake() {
use futures::task::ArcWake;
Expand Down
Loading