diff --git a/tokio-reactor/Cargo.toml b/tokio-reactor/Cargo.toml index be92447739f..077d7a08f9a 100644 --- a/tokio-reactor/Cargo.toml +++ b/tokio-reactor/Cargo.toml @@ -18,9 +18,16 @@ Event loop that drives Tokio I/O resources. categories = ["asynchronous", "network-programming"] [dependencies] +crossbeam-utils = "0.5.0" futures = "0.1.19" +lazy_static = "1.0.2" log = "0.4.1" mio = "0.6.14" +num_cpus = "1.8.0" +parking_lot = "0.6.3" slab = "0.4.0" tokio-executor = { version = "0.1.1", path = "../tokio-executor" } tokio-io = { version = "0.1.6", path = "../tokio-io" } + +[dev-dependencies] +tokio = { version = "0.1.7", path = ".." } diff --git a/tokio-reactor/examples/bench-poll.rs b/tokio-reactor/examples/bench-poll.rs new file mode 100644 index 00000000000..922ced5f36d --- /dev/null +++ b/tokio-reactor/examples/bench-poll.rs @@ -0,0 +1,45 @@ +extern crate futures; +extern crate mio; +extern crate tokio; +extern crate tokio_reactor; + +use futures::Async; +use mio::Ready; +use tokio::prelude::*; +use tokio_reactor::Registration; + +const NUM_FUTURES: usize = 1000; +const NUM_STEPS: usize = 1000; + +fn main() { + tokio::run(future::lazy(|| { + for _ in 0..NUM_FUTURES { + let (r, s) = mio::Registration::new2(); + let registration = Registration::new(); + registration.register(&r).unwrap(); + + let mut r = Some(r); + let mut step = 0; + + tokio::spawn(future::poll_fn(move || { + loop { + let is_ready = registration.poll_read_ready().unwrap().is_ready(); + + if is_ready { + step += 1; + + if step == NUM_STEPS { + r.take().unwrap(); + return Ok(Async::Ready(())); + } + } else { + s.set_readiness(Ready::readable()).unwrap(); + return Ok(Async::NotReady); + } + } + })); + } + + Ok(()) + })); +} diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs index 713deff54bb..b6aca9e57c8 100644 --- a/tokio-reactor/src/lib.rs +++ b/tokio-reactor/src/lib.rs @@ -30,11 +30,16 @@ #![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.2")] #![deny(missing_docs, warnings, missing_debug_implementations)] +extern crate crossbeam_utils; #[macro_use] extern crate futures; #[macro_use] +extern crate lazy_static; +#[macro_use] extern crate log; extern crate mio; +extern crate num_cpus; +extern crate parking_lot; extern crate slab; extern crate tokio_executor; extern crate tokio_io; @@ -46,6 +51,7 @@ mod atomic_task; pub(crate) mod background; mod poll_evented; mod registration; +mod sharded_rwlock; // ===== Public re-exports ===== @@ -56,6 +62,7 @@ pub use self::poll_evented::PollEvented; // ===== Private imports ===== use atomic_task::AtomicTask; +use sharded_rwlock::RwLock; use tokio_executor::Enter; use tokio_executor::park::{Park, Unpark}; @@ -66,7 +73,7 @@ use std::mem; use std::cell::RefCell; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; -use std::sync::{Arc, Weak, RwLock}; +use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use log::Level; @@ -334,7 +341,7 @@ impl Reactor { /// either successfully or with an error. pub fn is_idle(&self) -> bool { self.inner.io_dispatch - .read().unwrap() + .read() .is_empty() } @@ -397,7 +404,7 @@ impl Reactor { // Create a scope to ensure that notifying the tasks stays out of the // lock's critical section. { - let io_dispatch = self.inner.io_dispatch.read().unwrap(); + let io_dispatch = self.inner.io_dispatch.read(); let io = match io_dispatch.get(token) { Some(io) => io, @@ -641,7 +648,7 @@ impl Inner { // Get an ABA guard value let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); - let mut io_dispatch = self.io_dispatch.write().unwrap(); + let mut io_dispatch = self.io_dispatch.write(); if io_dispatch.len() == MAX_SOURCES { return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \ @@ -671,13 +678,13 @@ impl Inner { fn drop_source(&self, token: usize) { debug!("dropping I/O source: {}", token); - self.io_dispatch.write().unwrap().remove(token); + self.io_dispatch.write().remove(token); } /// Registers interest in the I/O resource associated with `token`. fn register(&self, token: usize, dir: Direction, t: Task) { debug!("scheduling direction for: {}", token); - let io_dispatch = self.io_dispatch.read().unwrap(); + let io_dispatch = self.io_dispatch.read(); let sched = io_dispatch.get(token).unwrap(); let (task, ready) = match dir { @@ -698,7 +705,7 @@ impl Drop for Inner { // When a reactor is dropped it needs to wake up all blocked tasks as // they'll never receive a notification, and all connected I/O objects // will start returning errors pretty quickly. - let io = self.io_dispatch.read().unwrap(); + let io = self.io_dispatch.read(); for (_, io) in io.iter() { io.writer.notify(); io.reader.notify(); diff --git a/tokio-reactor/src/registration.rs b/tokio-reactor/src/registration.rs index 278b57680c1..981dd5524bd 100644 --- a/tokio-reactor/src/registration.rs +++ b/tokio-reactor/src/registration.rs @@ -518,7 +518,7 @@ impl Inner { let mask = direction.mask(); let mask_no_hup = (mask - ::platform::hup()).as_usize(); - let io_dispatch = inner.io_dispatch.read().unwrap(); + let io_dispatch = inner.io_dispatch.read(); let sched = &io_dispatch[self.token]; // This consumes the current readiness state **except** for HUP. HUP is diff --git a/tokio-reactor/src/sharded_rwlock.rs b/tokio-reactor/src/sharded_rwlock.rs new file mode 100644 index 00000000000..0908961342f --- /dev/null +++ b/tokio-reactor/src/sharded_rwlock.rs @@ -0,0 +1,216 @@ +//! A scalable reader-writer lock. +//! +//! This implementation makes read operations faster and more scalable due to less contention, +//! while making write operations slower. It also incurs much higher memory overhead than +//! traditional reader-writer locks. + +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::Mutex; +use std::thread::{self, ThreadId}; + +use crossbeam_utils::CachePadded; +use num_cpus; +use parking_lot; + +/// A scalable read-writer lock. +/// +/// This type of lock allows a number of readers or at most one writer at any point in time. The +/// write portion of this lock typically allows modification of the underlying data (exclusive +/// access) and the read portion of this lock typically allows for read-only access (shared +/// access). +/// +/// This reader-writer lock differs from typical implementations in that it internally creates a +/// list of reader-writer locks called 'shards'. Shards are aligned and padded to the cache line +/// size. +/// +/// Read operations lock only one shard specific to the current thread, while write operations lock +/// every shard in succession. This strategy makes concurrent read operations faster due to less +/// contention, but write operations are slower due to increased amount of locking. +pub struct RwLock { + /// A list of locks protecting the internal data. + shards: Vec>>, + + /// The internal data. + value: UnsafeCell, +} + +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} + +impl RwLock { + /// Creates a new `RwLock` initialized with `value`. + pub fn new(value: T) -> RwLock { + // The number of shards is a power of two so that the modulo operation in `read` becomes a + // simple bitwise "and". + let num_shards = num_cpus::get().next_power_of_two(); + + RwLock { + shards: (0..num_shards) + .map(|_| CachePadded::new(parking_lot::RwLock::new(()))) + .collect(), + value: UnsafeCell::new(value), + } + } + + /// Locks this `RwLock` with shared read access, blocking the current thread until it can be + /// acquired. + /// + /// The calling thread will be blocked until there are no more writers which hold the lock. + /// There may be other readers currently inside the lock when this method returns. This method + /// does not provide any guarantees with respect to the ordering of whether contentious readers + /// or writers will acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access once it is dropped. + pub fn read(&self) -> RwLockReadGuard { + // Take the current thread index and map it to a shard index. Thread indices will tend to + // distribute shards among threads equally, thus reducing contention due to read-locking. + let shard_index = thread_index() & (self.shards.len() - 1); + + RwLockReadGuard { + parent: self, + _guard: self.shards[shard_index].read(), + _marker: PhantomData, + } + } + + /// Locks this rwlock with exclusive write access, blocking the current thread until it can be + /// acquired. + /// + /// This function will not return while other writers or other readers currently have access to + /// the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock when dropped. + pub fn write(&self) -> RwLockWriteGuard { + // Write-lock each shard in succession. + for shard in &self.shards { + // The write guard is forgotten, but the lock will be manually unlocked in `drop`. + mem::forget(shard.write()); + } + + RwLockWriteGuard { + parent: self, + _marker: PhantomData, + } + } +} + +/// A guard used to release the shared read access of a `RwLock` when dropped. +pub struct RwLockReadGuard<'a, T: 'a> { + parent: &'a RwLock, + _guard: parking_lot::RwLockReadGuard<'a, ()>, + _marker: PhantomData>, +} + +unsafe impl<'a, T: Sync> Sync for RwLockReadGuard<'a, T> {} + +impl<'a, T> Deref for RwLockReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +/// A guard used to release the exclusive write access of a `RwLock` when dropped. +pub struct RwLockWriteGuard<'a, T: 'a> { + parent: &'a RwLock, + _marker: PhantomData>, +} + +unsafe impl<'a, T: Sync> Sync for RwLockWriteGuard<'a, T> {} + +impl<'a, T> Drop for RwLockWriteGuard<'a, T> { + fn drop(&mut self) { + // Unlock the shards in reverse order of locking. + for shard in self.parent.shards.iter().rev() { + unsafe { + shard.force_unlock_write(); + } + } + } +} + +impl<'a, T> Deref for RwLockWriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +impl<'a, T> DerefMut for RwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.parent.value.get() } + } +} + +/// Returns a `usize` that identifies the current thread. +/// +/// Each thread is associated with an 'index'. While there are no particular guarantees, indices +/// usually tend to be consecutive numbers between 0 and the number of running threads. +#[inline] +pub fn thread_index() -> usize { + REGISTRATION.with(|reg| reg.index) +} + +/// The global registry keeping track of registered threads and indices. +struct ThreadIndices { + /// Mapping from `ThreadId` to thread index. + mapping: HashMap, + + /// A list of free indices. + free_list: Vec, + + /// The next index to allocate if the free list is empty. + next_index: usize, +} + +lazy_static! { + static ref THREAD_INDICES: Mutex = Mutex::new(ThreadIndices { + mapping: HashMap::new(), + free_list: Vec::new(), + next_index: 0, + }); +} + +/// A registration of a thread with an index. +/// +/// When dropped, unregisters the thread and frees the reserved index. +struct Registration { + index: usize, + thread_id: ThreadId, +} + +impl Drop for Registration { + fn drop(&mut self) { + let mut indices = THREAD_INDICES.lock().unwrap(); + indices.mapping.remove(&self.thread_id); + indices.free_list.push(self.index); + } +} + +thread_local! { + static REGISTRATION: Registration = { + let thread_id = thread::current().id(); + let mut indices = THREAD_INDICES.lock().unwrap(); + + let index = match indices.free_list.pop() { + Some(i) => i, + None => { + let i = indices.next_index; + indices.next_index += 1; + i + } + }; + indices.mapping.insert(thread_id, index); + + Registration { + index, + thread_id, + } + }; +}