Skip to content

Commit

Permalink
runtime: cleanup and add config options (#1807)
Browse files Browse the repository at this point in the history
* runtime: cleanup and add config options

This patch finishes the cleanup as part of the transition to Tokio 0.2.
A number of changes were made to take advantage of having all Tokio
types in a single crate. Also, fixes using Tokio types from
`spawn_blocking`.

* Many threads, one resource driver

Previously, in the threaded scheduler, a resource driver (mio::Poll /
timer combo) was created per thread. This was more or less fine, except
it required balancing across the available drivers. When using a
resource driver from **outside** of the thread pool, balancing is
tricky. The change was original done to avoid having a dedicated driver
thread.

Now, instead of creating many resource drivers, a single resource driver
is used. Each scheduler thread will attempt to "lock" the resource
driver before parking on it. If the resource driver is already locked,
the thread uses a condition variable to park. Contention should remain
low as, under load, the scheduler avoids using the drivers.

* Add configuration options to enable I/O / time

New configuration options are added to `runtime::Builder` to allow
enabling I/O and time drivers on a runtime instance basis. This is
useful when wanting to create lightweight runtime instances to execute
compute only tasks.

* Bug fixes

The condition variable parker is updated to the same algorithm used in
`std`. This is motivated by some potential deadlock cases discovered by
`loom`.

The basic scheduler is fixed to fairly schedule tasks. `push_front` was
accidentally used instead of `push_back`.

I/O, time, and spawning now work from within `spawn_blocking` closures.

* Misc cleanup

The threaded scheduler is no longer generic over `P :Park`. Instead, it
is hard coded to a specific parker. Tests, including loom tests, are
updated to use `Runtime` directly. This provides greater coverage.

The `blocking` module is moved back into `runtime` as all usage is
within `runtime` itself.
  • Loading branch information
carllerche authored Nov 22, 2019
1 parent 6866fe4 commit 8546ff8
Show file tree
Hide file tree
Showing 67 changed files with 1,657 additions and 1,358 deletions.
2 changes: 2 additions & 0 deletions tokio-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
fn #name(#inputs) #ret {
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async { #body })
Expand Down Expand Up @@ -211,6 +212,7 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
fn #name() #ret {
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async { #body })
Expand Down
2 changes: 2 additions & 0 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(loom))]

//! A mock type implementing [`AsyncRead`] and [`AsyncWrite`].
//!
//!
Expand Down
7 changes: 6 additions & 1 deletion tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! Tokio and Futures based testing utilites
pub mod io;

mod macros;
pub mod task;

Expand All @@ -27,7 +28,11 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;

let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();

rt.block_on(future)
}
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.14", optional = true }
mio = { version = "0.6.20", optional = true }
num_cpus = { version = "1.8.0", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(loom))]

//! Asynchronous file and standard stream adaptation.
//!
//! This module contains utility methods and adapter types for input/output to
Expand Down Expand Up @@ -96,6 +98,6 @@ mod sys {
pub(crate) use std::fs::File;

// TODO: don't rename
pub(crate) use crate::blocking::spawn_blocking as run;
pub(crate) use crate::runtime::spawn_blocking as run;
pub(crate) use crate::task::JoinHandle as Blocking;
}
2 changes: 1 addition & 1 deletion tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests

use crate::loom::sync::atomic::AtomicUsize;
use crate::runtime::{Park, Unpark};
use crate::park::{Park, Unpark};
use crate::util::slab::{Address, Slab};

use mio::event::Evented;
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(loom, allow(dead_code, unreachable_pub))]

//! Asynchronous I/O.
//!
//! This module is the asynchronous version of `std::io`. Primarily, it
Expand Down Expand Up @@ -204,7 +206,7 @@ cfg_io_blocking! {
/// Types in this module can be mocked out in tests.
mod sys {
// TODO: don't rename
pub(crate) use crate::blocking::spawn_blocking as run;
pub(crate) use crate::runtime::spawn_blocking as run;
pub(crate) use crate::task::JoinHandle as Blocking;
}
}
5 changes: 1 addition & 4 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,17 @@
#[macro_use]
mod macros;

// Blocking task implementation
pub(crate) mod blocking;

cfg_fs! {
pub mod fs;
}

mod future;

pub mod io;

pub mod net;

mod loom;
mod park;

pub mod prelude;

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![cfg_attr(not(feature = "full"), allow(unused_imports, dead_code))]
#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))]

mod atomic_u32;
mod atomic_u64;
Expand Down Expand Up @@ -45,8 +45,8 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;

pub(crate) use std::sync::atomic::spin_loop_hint;
pub(crate) use std::sync::atomic::{fence, AtomicPtr};
pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
}
}

Expand Down
8 changes: 6 additions & 2 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#![allow(unused_macros)]

macro_rules! cfg_atomic_waker {
macro_rules! cfg_resource_drivers {
($($item:item)*) => {
$( #[cfg(any(feature = "io-driver", feature = "time"))] $item )*
$(
#[cfg(any(feature = "io-driver", feature = "time"))]
#[cfg(not(loom))]
$item
)*
}
}

Expand Down
8 changes: 4 additions & 4 deletions tokio/src/net/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ cfg_dns! {
type Future = sealed::MaybeReady;

fn to_socket_addrs(&self) -> Self::Future {
use crate::blocking;
use crate::runtime::spawn_blocking;
use sealed::MaybeReady;

// First check if the input parses as a socket address
Expand All @@ -137,7 +137,7 @@ cfg_dns! {
// Run DNS lookup on the blocking pool
let s = self.to_owned();

MaybeReady::Blocking(blocking::spawn_blocking(move || {
MaybeReady::Blocking(spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&s)
}))
}
Expand All @@ -152,7 +152,7 @@ cfg_dns! {
type Future = sealed::MaybeReady;

fn to_socket_addrs(&self) -> Self::Future {
use crate::blocking;
use crate::runtime::spawn_blocking;
use sealed::MaybeReady;

let (host, port) = *self;
Expand All @@ -174,7 +174,7 @@ cfg_dns! {

let host = host.to_owned();

MaybeReady::Blocking(blocking::spawn_blocking(move || {
MaybeReady::Blocking(spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port))
}))
}
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(loom))]

//! TCP/UDP/Unix bindings for `tokio`.
//!
//! This module contains the TCP/UDP/Unix networking types, similar to the standard
Expand Down
65 changes: 65 additions & 0 deletions tokio/src/park/either.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::park::{Park, Unpark};

use std::fmt;
use std::time::Duration;

pub(crate) enum Either<A, B> {
A(A),
B(B),
}

impl<A, B> Park for Either<A, B>
where
A: Park,
B: Park,
{
type Unpark = Either<A::Unpark, B::Unpark>;
type Error = Either<A::Error, B::Error>;

fn unpark(&self) -> Self::Unpark {
match self {
Either::A(a) => Either::A(a.unpark()),
Either::B(b) => Either::B(b.unpark()),
}
}

fn park(&mut self) -> Result<(), Self::Error> {
match self {
Either::A(a) => a.park().map_err(Either::A),
Either::B(b) => b.park().map_err(Either::B),
}
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
match self {
Either::A(a) => a.park_timeout(duration).map_err(Either::A),
Either::B(b) => b.park_timeout(duration).map_err(Either::B),
}
}
}

impl<A, B> Unpark for Either<A, B>
where
A: Unpark,
B: Unpark,
{
fn unpark(&self) {
match self {
Either::A(a) => a.unpark(),
Either::B(b) => b.unpark(),
}
}
}

impl<A, B> fmt::Debug for Either<A, B>
where
A: fmt::Debug,
B: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Either::A(a) => a.fmt(fmt),
Either::B(b) => b.fmt(fmt),
}
}
}
16 changes: 11 additions & 5 deletions tokio/src/runtime/park/mod.rs → tokio/src/park/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@
//! [up]: trait.Unpark.html
//! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html
cfg_resource_drivers! {
mod either;
pub(crate) use self::either::Either;
}

mod thread;
#[cfg(feature = "rt-threaded")]
pub(crate) use self::thread::CachedParkThread;
#[cfg(not(feature = "io-driver"))]
pub(crate) use self::thread::ParkThread;

cfg_blocking_impl! {
pub(crate) use self::thread::CachedParkThread;
}

use std::sync::Arc;
use std::time::Duration;

Expand All @@ -58,7 +64,7 @@ use std::time::Duration;
/// See [module documentation][mod] for more details.
///
/// [mod]: ../index.html
pub trait Park {
pub(crate) trait Park {
/// Unpark handle type for the `Park` implementation.
type Unpark: Unpark;

Expand Down Expand Up @@ -112,7 +118,7 @@ pub trait Park {
///
/// [mod]: ../index.html
/// [`Park`]: trait.Park.html
pub trait Unpark: Sync + Send + 'static {
pub(crate) trait Unpark: Sync + Send + 'static {
/// Unblock a thread that is blocked by the associated `Park` handle.
///
/// Calling `unpark` atomically makes available the unpark token, if it is
Expand Down
Loading

0 comments on commit 8546ff8

Please sign in to comment.