Skip to content

Commit

Permalink
time: make test-util paused time fully deterministic (#3492)
Browse files Browse the repository at this point in the history
The time driver stores an Instant internally used as a "base" for future
time calculations. Since this is generated as the Runtime is being
constructed, it previously always happened before the user had a chance
to pause time. The fractional-millisecond variations in the timing
around the runtime construction and time pause cause tests running
entirely in paused time to be very slightly deterministic, with the time
driver advancing time by 1 millisecond more or less depending on how the
sub-millisecond components of the `Instant`s involved compared.

To avoid this, there is now a new option on `runtime::Builder` which
will create a `Runtime` with time "instantly" paused. This, along with a
small change to have the time driver use the provided clock as the
source for its start time allow totally deterministic tests with paused
time.
  • Loading branch information
sfackler authored Feb 5, 2021
1 parent 1c1e0e3 commit fcb6d04
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 36 deletions.
4 changes: 2 additions & 2 deletions tests-build/tests/fail/macros_invalid_input.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ error: the async keyword is missing from the function declaration
4 | fn main_is_not_async() {}
| ^^

error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused`
--> $DIR/macros_invalid_input.rs:6:15
|
6 | #[tokio::main(foo)]
Expand All @@ -28,7 +28,7 @@ error: the test function cannot accept arguments
16 | async fn test_fn_has_args(_x: u8) {}
| ^^^^^^

error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused`
--> $DIR/macros_invalid_input.rs:18:15
|
18 | #[tokio::test(foo)]
Expand Down
79 changes: 62 additions & 17 deletions tokio-macros/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ impl RuntimeFlavor {
struct FinalConfig {
flavor: RuntimeFlavor,
worker_threads: Option<usize>,
start_paused: Option<bool>,
}

struct Configuration {
rt_multi_thread_available: bool,
default_flavor: RuntimeFlavor,
flavor: Option<RuntimeFlavor>,
worker_threads: Option<(usize, Span)>,
start_paused: Option<(bool, Span)>,
}

impl Configuration {
Expand All @@ -44,6 +46,7 @@ impl Configuration {
},
flavor: None,
worker_threads: None,
start_paused: None,
}
}

Expand Down Expand Up @@ -79,31 +82,57 @@ impl Configuration {
Ok(())
}

fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> {
if self.start_paused.is_some() {
return Err(syn::Error::new(span, "`start_paused` set multiple times."));
}

let start_paused = parse_bool(start_paused, span, "start_paused")?;
self.start_paused = Some((start_paused, span));
Ok(())
}

fn build(&self) -> Result<FinalConfig, syn::Error> {
let flavor = self.flavor.unwrap_or(self.default_flavor);
use RuntimeFlavor::*;
match (flavor, self.worker_threads) {
(CurrentThread, Some((_, worker_threads_span))) => Err(syn::Error::new(
worker_threads_span,
"The `worker_threads` option requires the `multi_thread` runtime flavor.",
)),
(CurrentThread, None) => Ok(FinalConfig {
flavor,
worker_threads: None,
}),
(Threaded, worker_threads) if self.rt_multi_thread_available => Ok(FinalConfig {
flavor,
worker_threads: worker_threads.map(|(val, _span)| val),
}),

let worker_threads = match (flavor, self.worker_threads) {
(CurrentThread, Some((_, worker_threads_span))) => {
return Err(syn::Error::new(
worker_threads_span,
"The `worker_threads` option requires the `multi_thread` runtime flavor.",
))
}
(CurrentThread, None) => None,
(Threaded, worker_threads) if self.rt_multi_thread_available => {
worker_threads.map(|(val, _span)| val)
}
(Threaded, _) => {
let msg = if self.flavor.is_none() {
"The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled."
} else {
"The runtime flavor `multi_thread` requires the `rt-multi-thread` feature."
};
Err(syn::Error::new(Span::call_site(), msg))
return Err(syn::Error::new(Span::call_site(), msg));
}
}
};

let start_paused = match (flavor, self.start_paused) {
(Threaded, Some((_, start_paused_span))) => {
return Err(syn::Error::new(
start_paused_span,
"The `start_paused` option requires the `current_thread` runtime flavor.",
));
}
(CurrentThread, Some((start_paused, _))) => Some(start_paused),
(_, None) => None,
};

Ok(FinalConfig {
flavor,
worker_threads,
start_paused,
})
}
}

Expand Down Expand Up @@ -134,6 +163,16 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result<String, syn::E
}
}

fn parse_bool(bool: syn::Lit, span: Span, field: &str) -> Result<bool, syn::Error> {
match bool {
syn::Lit::Bool(b) => Ok(b.value),
_ => Err(syn::Error::new(
span,
format!("Failed to parse {} as bool.", field),
)),
}
}

fn parse_knobs(
mut input: syn::ItemFn,
args: syn::AttributeArgs,
Expand Down Expand Up @@ -174,6 +213,9 @@ fn parse_knobs(
"flavor" => {
config.set_flavor(namevalue.lit.clone(), namevalue.span())?;
}
"start_paused" => {
config.set_start_paused(namevalue.lit.clone(), namevalue.span())?;
}
"core_threads" => {
let msg = "Attribute `core_threads` is renamed to `worker_threads`";
return Err(syn::Error::new_spanned(namevalue, msg));
Expand Down Expand Up @@ -204,11 +246,11 @@ fn parse_knobs(
macro_name
)
}
"flavor" | "worker_threads" => {
"flavor" | "worker_threads" | "start_paused" => {
format!("The `{}` attribute requires an argument.", name)
}
name => {
format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`", name)
format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`", name)
}
};
return Err(syn::Error::new_spanned(path, msg));
Expand All @@ -235,6 +277,9 @@ fn parse_knobs(
if let Some(v) = config.worker_threads {
rt = quote! { #rt.worker_threads(#v) };
}
if let Some(v) = config.start_paused {
rt = quote! { #rt.start_paused(#v) };
}

let header = {
if is_test {
Expand Down
33 changes: 33 additions & 0 deletions tokio-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ use proc_macro::TokenStream;
/// }
/// ```
///
/// ### Configure the runtime to start with time paused
///
/// ```rust
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// println!("Hello world");
/// }
/// ```
///
/// Equivalent code not using `#[tokio::main]`
///
/// ```rust
/// fn main() {
/// tokio::runtime::Builder::new_current_thread()
/// .enable_all()
/// .start_paused(true)
/// .build()
/// .unwrap()
/// .block_on(async {
/// println!("Hello world");
/// })
/// }
/// ```
///
/// ### NOTE:
///
/// If you rename the Tokio crate in your dependencies this macro will not work.
Expand Down Expand Up @@ -225,6 +249,15 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream {
/// }
/// ```
///
/// ### Configure the runtime to start with time paused
///
/// ```no_run
/// #[tokio::test(start_paused = true)]
/// async fn my_test() {
/// assert!(true);
/// }
/// ```
///
/// ### NOTE:
///
/// If you rename the Tokio crate in your dependencies this macro will not work.
Expand Down
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
futures = { version = "0.3.0", features = ["async-await"] }
proptest = "0.10.0"
rand = "0.8.0"
tempfile = "3.1.0"
async-stream = "0.3"

Expand Down
32 changes: 32 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Builder {
/// Whether or not to enable the time driver
enable_time: bool,

/// Whether or not the clock should start paused.
start_paused: bool,

/// The number of worker threads, used by Runtime.
///
/// Only used when not using the current-thread executor.
Expand Down Expand Up @@ -110,6 +113,9 @@ impl Builder {
// Time defaults to "off"
enable_time: false,

// The clock starts not-paused
start_paused: false,

// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,

Expand Down Expand Up @@ -386,6 +392,7 @@ impl Builder {
},
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
}
}

Expand Down Expand Up @@ -489,6 +496,31 @@ cfg_time! {
}
}

cfg_test_util! {
impl Builder {
/// Controls if the runtime's clock starts paused or advancing.
///
/// Pausing time requires the current-thread runtime; construction of
/// the runtime will panic otherwise.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_current_thread()
/// .enable_time()
/// .start_paused(true)
/// .build()
/// .unwrap();
/// ```
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
self.start_paused = start_paused;
self
}
}
}

cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
Expand Down
10 changes: 6 additions & 4 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ cfg_time! {
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;

fn create_clock(enable_pausing: bool) -> Clock {
crate::time::Clock::new(enable_pausing)
fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
crate::time::Clock::new(enable_pausing, start_paused)
}

fn create_time_driver(
Expand All @@ -131,7 +131,7 @@ cfg_not_time! {
pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();

fn create_clock(_enable_pausing: bool) -> Clock {
fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
()
}

Expand Down Expand Up @@ -162,13 +162,15 @@ pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
}

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;

let clock = create_clock(cfg.enable_pause_time);
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());

Expand Down
15 changes: 11 additions & 4 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cfg_not_test_util! {
}

impl Clock {
pub(crate) fn new(_enable_pausing: bool) -> Clock {
pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock {
Clock {}
}

Expand Down Expand Up @@ -78,7 +78,8 @@ cfg_test_util! {
/// that depend on time.
///
/// Pausing time requires the `current_thread` Tokio runtime. This is the
/// default runtime used by `#[tokio::test]`
/// default runtime used by `#[tokio::test]`. The runtime can be initialized
/// with time in a paused state using the `Builder::start_paused` method.
///
/// # Panics
///
Expand Down Expand Up @@ -149,16 +150,22 @@ cfg_test_util! {
impl Clock {
/// Return a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new(enable_pausing: bool) -> Clock {
pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
let now = std::time::Instant::now();

Clock {
let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
})),
};

if start_paused {
clock.pause();
}

clock
}

pub(crate) fn pause(&self) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ pub(self) struct ClockTime {
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
start_time: clock.now(),
clock,
start_time: super::clock::now(),
}
}

Expand Down
Loading

0 comments on commit fcb6d04

Please sign in to comment.