Skip to content

Commit

Permalink
miri: make miri accept our intrusive linked lists (#4397)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored Feb 9, 2022
1 parent ca51f6a commit 1be8e9d
Show file tree
Hide file tree
Showing 18 changed files with 168 additions and 61 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v1
- name: miri
run: |
set -e
rm -rf tests
cargo miri test --features rt,rt-multi-thread,sync task
# Many of tests in tokio/tests and doctests use #[tokio::test] or
# #[tokio::main] that calls epoll_create1 that Miri does not support.
run: cargo miri test --features full --lib --no-fail-fast
working-directory: tokio
env:
MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-tag-raw-pointers
PROPTEST_CASES: 10

san:
name: san
Expand Down
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ LOOM_MAX_PREEMPTIONS=1 RUSTFLAGS="--cfg loom" \
cargo test --lib --release --features full -- --test-threads=1 --nocapture
```

You can run miri tests with
```
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-tag-raw-pointers" PROPTEST_CASES=10 \
cargo +nightly miri test --features full --lib
```

### Tests

If the change being proposed alters code (as opposed to only documentation for
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/fs/file/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ fn flush_while_idle() {
}

#[test]
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn read_with_buffer_larger_than_max() {
// Chunks
let chunk_a = 16 * 1024;
Expand Down Expand Up @@ -299,6 +300,7 @@ fn read_with_buffer_larger_than_max() {
}

#[test]
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn write_with_buffer_larger_than_max() {
// Chunks
let chunk_a = 16 * 1024;
Expand Down
1 change: 1 addition & 0 deletions tokio/src/process/unix/orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ pub(crate) mod test {
drop(signal_guard);
}

#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ where
}
}

fn header_ptr(&self) -> NonNull<Header> {
self.cell.cast()
}

fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
Expand Down Expand Up @@ -93,7 +97,8 @@ where

match self.header().state.transition_to_running() {
TransitionToRunning::Success => {
let waker_ref = waker_ref::<T, S>(self.header());
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let res = poll_future(&self.core().stage, cx);

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ cfg_rt_multi_thread! {

impl<S: 'static> Task<S> {
fn into_raw(self) -> NonNull<Header> {
let ret = self.header().into();
let ret = self.raw.header_ptr();
mem::forget(self);
ret
}
Expand Down Expand Up @@ -427,7 +427,7 @@ unsafe impl<S> linked_list::Link for Task<S> {
type Target = Header;

fn as_raw(handle: &Task<S>) -> NonNull<Header> {
handle.header().into()
handle.raw.header_ptr()
}

unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl RawTask {
RawTask { ptr }
}

pub(super) fn header_ptr(&self) -> NonNull<Header> {
self.ptr
}

/// Returns a reference to the task's meta structure.
///
/// Safe as `Header` is `Sync`.
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/task/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(super) struct WakerRef<'a, S: 'static> {

/// Returns a `WakerRef` which avoids having to pre-emptively increase the
/// refcount if there is no need to do so.
pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S>
pub(super) fn waker_ref<T, S>(header: &NonNull<Header>) -> WakerRef<'_, S>
where
T: Future,
S: Schedule,
Expand All @@ -28,7 +28,7 @@ where
// point and not an *owned* waker, we must ensure that `drop` is never
// called on this waker instance. This is done by wrapping it with
// `ManuallyDrop` and then never calling drop.
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(*header))) };

WakerRef {
waker,
Expand Down Expand Up @@ -77,7 +77,7 @@ where
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.clone");
(*header).state.ref_inc();
raw_waker::<T, S>(header)
raw_waker::<T, S>(ptr)
}

unsafe fn drop_waker<T, S>(ptr: *const ())
Expand Down Expand Up @@ -114,12 +114,12 @@ where
harness.wake_by_ref();
}

fn raw_waker<T, S>(header: *const Header) -> RawWaker
fn raw_waker<T, S>(header: NonNull<Header>) -> RawWaker
where
T: Future,
S: Schedule,
{
let ptr = header as *const ();
let ptr = header.as_ptr() as *const ();
let vtable = &RawWakerVTable::new(
clone_waker::<T, S>,
wake_by_val::<T, S>,
Expand Down
20 changes: 14 additions & 6 deletions tokio/src/runtime/tests/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,21 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}

const fn normal_or_miri(normal: usize, miri: usize) -> usize {
if cfg!(miri) {
miri
} else {
normal
}
}

#[test]
fn stress1() {
const NUM_ITER: usize = 1;
const NUM_STEAL: usize = 1_000;
const NUM_LOCAL: usize = 1_000;
const NUM_PUSH: usize = 500;
const NUM_POP: usize = 250;
const NUM_STEAL: usize = normal_or_miri(1_000, 10);
const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
const NUM_PUSH: usize = normal_or_miri(500, 10);
const NUM_POP: usize = normal_or_miri(250, 10);

let mut metrics = MetricsBatch::new();

Expand Down Expand Up @@ -169,8 +177,8 @@ fn stress1() {
#[test]
fn stress2() {
const NUM_ITER: usize = 1;
const NUM_TASKS: usize = 1_000_000;
const NUM_STEAL: usize = 1_000;
const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
const NUM_STEAL: usize = normal_or_miri(1_000, 10);

let mut metrics = MetricsBatch::new();

Expand Down
7 changes: 6 additions & 1 deletion tokio/src/signal/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ mod tests {
registry.broadcast();

// Yield so the previous broadcast can get received
crate::time::sleep(std::time::Duration::from_millis(10)).await;
//
// This yields many times since the block_on task is only polled every 61
// ticks.
for _ in 0..100 {
crate::task::yield_now().await;
}

// Send subsequent signal
registry.record_event(0);
Expand Down
1 change: 1 addition & 0 deletions tokio/src/signal/reusable_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl<T> fmt::Debug for ReusableBoxFuture<T> {
}

#[cfg(test)]
#[cfg(not(miri))] // Miri breaks when you use Pin<&mut dyn Future>
mod test {
use super::ReusableBoxFuture;
use futures::future::FutureExt;
Expand Down
5 changes: 3 additions & 2 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ enum NotificationType {
}

#[derive(Debug)]
#[repr(C)] // required by `linked_list::Link` impl
struct Waiter {
/// Intrusive linked-list pointers.
pointers: linked_list::Pointers<Waiter>,
Expand Down Expand Up @@ -731,8 +732,8 @@ unsafe impl linked_list::Link for Waiter {
ptr
}

unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
target.cast()
}
}

Expand Down
34 changes: 34 additions & 0 deletions tokio/src/sync/tests/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,37 @@ fn notify_clones_waker_before_lock() {
// The result doesn't matter, we're just testing that we don't deadlock.
let _ = future.poll(&mut cx);
}

#[test]
fn notify_simple() {
let notify = Notify::new();

let mut fut1 = tokio_test::task::spawn(notify.notified());
assert!(fut1.poll().is_pending());

let mut fut2 = tokio_test::task::spawn(notify.notified());
assert!(fut2.poll().is_pending());

notify.notify_waiters();

assert!(fut1.poll().is_ready());
assert!(fut2.poll().is_ready());
}

#[test]
#[cfg(not(target_arch = "wasm32"))]
fn watch_test() {
let rt = crate::runtime::Builder::new_current_thread()
.build()
.unwrap();

rt.block_on(async {
let (tx, mut rx) = crate::sync::watch::channel(());

crate::spawn(async move {
let _ = tx.send(());
});

let _ = rx.changed().await;
});
}
24 changes: 13 additions & 11 deletions tokio/src/time/driver/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,16 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, Ti
///
/// Note that this structure is located inside the `TimerEntry` structure.
#[derive(Debug)]
#[repr(C)] // required by `link_list::Link` impl
pub(crate) struct TimerShared {
/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,

/// Current state. This records whether the timer entry is currently under
/// the ownership of the driver, and if not, its current state (not
/// complete, fired, error, etc).
state: StateCell,

/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,

_p: PhantomPinned,
}

Expand Down Expand Up @@ -420,20 +421,21 @@ impl TimerShared {
/// padded. This contains the information that the driver thread accesses most
/// frequently to minimize contention. In particular, we move it away from the
/// waker, as the waker is updated on every poll.
#[repr(C)] // required by `link_list::Link` impl
struct TimerSharedPadded {
/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
///
/// Only accessed under the entry lock.
pointers: linked_list::Pointers<TimerShared>,

/// The expiration time for which this entry is currently registered.
/// Generally owned by the driver, but is accessed by the entry when not
/// registered.
cached_when: AtomicU64,

/// The true expiration time. Set by the timer future, read by the driver.
true_when: AtomicU64,

/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
///
/// Only accessed under the entry lock.
pointers: StdUnsafeCell<linked_list::Pointers<TimerShared>>,
}

impl std::fmt::Debug for TimerSharedPadded {
Expand All @@ -450,7 +452,7 @@ impl TimerSharedPadded {
Self {
cached_when: AtomicU64::new(0),
true_when: AtomicU64::new(0),
pointers: StdUnsafeCell::new(linked_list::Pointers::new()),
pointers: linked_list::Pointers::new(),
}
}
}
Expand All @@ -474,7 +476,7 @@ unsafe impl linked_list::Link for TimerShared {
unsafe fn pointers(
target: NonNull<Self::Target>,
) -> NonNull<linked_list::Pointers<Self::Target>> {
unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() }
target.cast()
}
}

Expand Down
20 changes: 17 additions & 3 deletions tokio/src/time/driver/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
return loom::future::block_on(f);

#[cfg(not(loom))]
return futures::executor::block_on(f);
{
let rt = crate::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(f)
}
}

fn model(f: impl Fn() + Send + Sync + 'static) {
Expand Down Expand Up @@ -182,6 +187,15 @@ fn reset_future() {
})
}

#[cfg(not(loom))]
fn normal_or_miri<T>(normal: T, miri: T) -> T {
if cfg!(miri) {
miri
} else {
normal
}
}

#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
Expand All @@ -195,7 +209,7 @@ fn poll_process_levels() {

let mut entries = vec![];

for i in 0..1024 {
for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
&handle,
clock.now() + Duration::from_millis(i),
Expand All @@ -208,7 +222,7 @@ fn poll_process_levels() {
entries.push(entry);
}

for t in 1..1024 {
for t in 1..normal_or_miri(1024, 64) {
handle.process_at_time(t as u64);
for (deadline, future) in entries.iter_mut().enumerate() {
let mut context = Context::from_waker(noop_waker_ref());
Expand Down
Loading

0 comments on commit 1be8e9d

Please sign in to comment.