Skip to content

Commit

Permalink
sync: new internal semaphore based on intrusive lists (#2325)
Browse files Browse the repository at this point in the history
## Motivation

Many of Tokio's synchronization primitives (`RwLock`, `Mutex`,
`Semaphore`, and the bounded MPSC channel) are based on the internal
semaphore implementation, called `semaphore_ll`. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public `Semaphore` type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.

Currently, `semaphore_ll` uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only _deallocated_ when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).

## Solution

@Matthias247 has proposed that Tokio adopt the approach used in his
`futures-intrusive` crate: using an _intrusive_ linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node `!Unpin`. In this approach,
the waiter node can be stored inline in the future, rather than
requiring  separate heap allocation, and cancelled futures may remove
their nodes from the list.

This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old `semaphore_ll` and the semaphore used in `futures-intrusive`:
while a `Mutex` around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically. 

The mutex is acquired only when accessing the wait list — if a task 
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers *after* releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.


Because of the intrusive list invariants, the API provided by the new
`batch_semaphore` is somewhat different than that of `semaphore_ll`. In
particular, the `Permit` type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an `Acquire` future returned by the `Semaphore` type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
`poll_acquire` operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old `semaphore_ll`
implementation in place to be used by the bounded MPSC, and updated the
`Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.

Fixes #2237

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Mar 23, 2020
1 parent 2258de5 commit acf8a7d
Show file tree
Hide file tree
Showing 14 changed files with 1,574 additions and 101 deletions.
11 changes: 11 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ harness = false
name = "scheduler"
path = "scheduler.rs"
harness = false


[[bench]]
name = "sync_rwlock"
path = "sync_rwlock.rs"
harness = false

[[bench]]
name = "sync_semaphore"
path = "sync_semaphore.rs"
harness = false
147 changes: 147 additions & 0 deletions benches/sync_rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bencher::{black_box, Bencher};
use std::sync::Arc;
use tokio::{sync::RwLock, task};

fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
for _ in 0..6 {
let read = lock.read().await;
black_box(read);
}
})
});
}

fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone()))
};
j.unwrap();
})
});
}

fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
tokio::join! {
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone())
};
})
});
}

fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
let j = tokio::try_join! {
async move { drop(write); Ok(()) },
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
};
j.unwrap();
})
});
}

fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

async fn task(lock: Arc<RwLock<()>>) {
let read = lock.read().await;
black_box(read);
}

let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
tokio::join! {
async move { drop(write) },
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
};
})
});
}

bencher::benchmark_group!(
sync_rwlock,
read_uncontended,
read_concurrent_uncontended,
read_concurrent_uncontended_multi,
read_concurrent_contended,
read_concurrent_contended_multi
);

bencher::benchmark_main!(sync_rwlock);
130 changes: 130 additions & 0 deletions benches/sync_semaphore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use bencher::Bencher;
use std::sync::Arc;
use tokio::{sync::Semaphore, task};

fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
for _ in 0..6 {
let permit = s.acquire().await;
drop(permit);
}
})
});
}

async fn task(s: Arc<Semaphore>) {
let permit = s.acquire().await;
drop(permit);
}

fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
})
});
}

fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
})
});
}

fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(5));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
})
});
}

fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

let s = Arc::new(Semaphore::new(5));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
})
});
}

bencher::benchmark_group!(
sync_semaphore,
uncontended,
uncontended_concurrent_multi,
uncontended_concurrent_single,
contended_concurrent_multi,
contended_concurrent_single
);

bencher::benchmark_main!(sync_semaphore);
70 changes: 70 additions & 0 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
// NOTE: The doctests in this module are ignored since the whole module is (currently) private.

use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Constant used to determine how much "work" a task is allowed to do without yielding.
Expand Down Expand Up @@ -250,6 +252,74 @@ pub async fn proceed() {
poll_fn(|cx| poll_proceed(cx)).await;
}

pin_project_lite::pin_project! {
/// A future that cooperatively yields to the task scheduler when polling,
/// if the task's budget is exhausted.
///
/// Internally, this is simply a future combinator which calls
/// [`poll_proceed`] in its `poll` implementation before polling the wrapped
/// future.
///
/// # Examples
///
/// ```rust,ignore
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::coop::CoopFutureExt;
///
/// async { /* ... */ }
/// .cooperate()
/// .await;
/// # }
/// ```
///
/// [`poll_proceed`]: fn.poll_proceed.html
#[derive(Debug)]
#[allow(unreachable_pub, dead_code)]
pub struct CoopFuture<F> {
#[pin]
future: F,
}
}

impl<F: Future> Future for CoopFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(poll_proceed(cx));
self.project().future.poll(cx)
}
}

impl<F: Future> CoopFuture<F> {
/// Returns a new `CoopFuture` wrapping the given future.
///
#[allow(unreachable_pub, dead_code)]
pub fn new(future: F) -> Self {
Self { future }
}
}

// Currently only used by `tokio::sync`; and if we make this combinator public,
// it should probably be on the `FutureExt` trait instead.
cfg_sync! {
/// Extension trait providing `Future::cooperate` extension method.
///
/// Note: if/when the co-op API becomes public, this method should probably be
/// provided by `FutureExt`, instead.
pub(crate) trait CoopFutureExt: Future {
/// Wrap `self` to cooperatively yield to the scheduler when polling, if the
/// task's budget is exhausted.
fn cooperate(self) -> CoopFuture<Self>
where
Self: Sized,
{
CoopFuture::new(self)
}
}

impl<F> CoopFutureExt for F where F: Future {}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit acf8a7d

Please sign in to comment.