diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 6f87715dd34..f24bb03fb4f 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -6,6 +6,7 @@ use crate::runtime::park::CachedParkThread; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::{bounded, list, unbounded}; use crate::sync::notify::Notify; +use crate::util::cacheline::CachePadded; use std::fmt; use std::process; @@ -46,18 +47,18 @@ pub(crate) trait Semaphore { } pub(super) struct Chan { + /// Handle to the push half of the lock-free list. + tx: CachePadded>, + + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: CachePadded, + /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, - /// Handle to the push half of the lock-free list. - tx: list::Tx, - /// Coordinates access to channel's capacity. semaphore: S, - /// Receiver waker. Notified when a value is pushed into the channel. - rx_waker: AtomicWaker, - /// Tracks the number of outstanding sender handles. /// /// When this drops to zero, the send half of the channel is closed. @@ -73,9 +74,9 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Chan") - .field("tx", &self.tx) + .field("tx", &*self.tx) .field("semaphore", &self.semaphore) - .field("rx_waker", &self.rx_waker) + .field("rx_waker", &*self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() @@ -108,9 +109,9 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { let chan = Arc::new(Chan { notify_rx_closed: Notify::new(), - tx, + tx: CachePadded::new(tx), semaphore, - rx_waker: AtomicWaker::new(), + rx_waker: CachePadded::new(AtomicWaker::new()), tx_count: AtomicUsize::new(1), rx_fields: UnsafeCell::new(RxFields { list: rx, diff --git a/tokio/src/util/cacheline.rs b/tokio/src/util/cacheline.rs new file mode 100644 index 00000000000..64fd5ccad33 --- /dev/null +++ b/tokio/src/util/cacheline.rs @@ -0,0 +1,95 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] +use std::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub(crate) struct CachePadded { + value: T, +} + +impl CachePadded { + /// Pads and aligns a value to the length of a cache line. + pub(crate) fn new(value: T) -> CachePadded { + CachePadded:: { value } + } +} + +impl Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index dc997f4e67a..7d4cd5f9c7c 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -75,3 +75,5 @@ pub(crate) mod error; pub(crate) mod memchr; pub(crate) mod markers; + +pub(crate) mod cacheline;