From fcb6d041b9d2fe567b5306e648cbb048b426a49d Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Fri, 5 Feb 2021 14:12:25 -0500 Subject: [PATCH] time: make test-util paused time fully deterministic (#3492) The time driver stores an Instant internally used as a "base" for future time calculations. Since this is generated as the Runtime is being constructed, it previously always happened before the user had a chance to pause time. The fractional-millisecond variations in the timing around the runtime construction and time pause cause tests running entirely in paused time to be very slightly deterministic, with the time driver advancing time by 1 millisecond more or less depending on how the sub-millisecond components of the `Instant`s involved compared. To avoid this, there is now a new option on `runtime::Builder` which will create a `Runtime` with time "instantly" paused. This, along with a small change to have the time driver use the provided clock as the source for its start time allow totally deterministic tests with paused time. --- .../tests/fail/macros_invalid_input.stderr | 4 +- tokio-macros/src/entry.rs | 79 +++++++++++++++---- tokio-macros/src/lib.rs | 33 ++++++++ tokio/Cargo.toml | 1 + tokio/src/runtime/builder.rs | 32 ++++++++ tokio/src/runtime/driver.rs | 10 ++- tokio/src/time/clock.rs | 15 +++- tokio/src/time/driver/mod.rs | 2 +- tokio/src/time/driver/tests/mod.rs | 12 +-- tokio/src/util/mod.rs | 4 +- tokio/tests/time_pause.rs | 26 ++++++ 11 files changed, 182 insertions(+), 36 deletions(-) diff --git a/tests-build/tests/fail/macros_invalid_input.stderr b/tests-build/tests/fail/macros_invalid_input.stderr index 4c68bd93f6c..bba2009352d 100644 --- a/tests-build/tests/fail/macros_invalid_input.stderr +++ b/tests-build/tests/fail/macros_invalid_input.stderr @@ -4,7 +4,7 @@ error: the async keyword is missing from the function declaration 4 | fn main_is_not_async() {} | ^^ -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads` +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` --> $DIR/macros_invalid_input.rs:6:15 | 6 | #[tokio::main(foo)] @@ -28,7 +28,7 @@ error: the test function cannot accept arguments 16 | async fn test_fn_has_args(_x: u8) {} | ^^^^^^ -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads` +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` --> $DIR/macros_invalid_input.rs:18:15 | 18 | #[tokio::test(foo)] diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index ae8c0b9d9f9..f82a329af16 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -25,6 +25,7 @@ impl RuntimeFlavor { struct FinalConfig { flavor: RuntimeFlavor, worker_threads: Option, + start_paused: Option, } struct Configuration { @@ -32,6 +33,7 @@ struct Configuration { default_flavor: RuntimeFlavor, flavor: Option, worker_threads: Option<(usize, Span)>, + start_paused: Option<(bool, Span)>, } impl Configuration { @@ -44,6 +46,7 @@ impl Configuration { }, flavor: None, worker_threads: None, + start_paused: None, } } @@ -79,31 +82,57 @@ impl Configuration { Ok(()) } + fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.start_paused.is_some() { + return Err(syn::Error::new(span, "`start_paused` set multiple times.")); + } + + let start_paused = parse_bool(start_paused, span, "start_paused")?; + self.start_paused = Some((start_paused, span)); + Ok(()) + } + fn build(&self) -> Result { let flavor = self.flavor.unwrap_or(self.default_flavor); use RuntimeFlavor::*; - match (flavor, self.worker_threads) { - (CurrentThread, Some((_, worker_threads_span))) => Err(syn::Error::new( - worker_threads_span, - "The `worker_threads` option requires the `multi_thread` runtime flavor.", - )), - (CurrentThread, None) => Ok(FinalConfig { - flavor, - worker_threads: None, - }), - (Threaded, worker_threads) if self.rt_multi_thread_available => Ok(FinalConfig { - flavor, - worker_threads: worker_threads.map(|(val, _span)| val), - }), + + let worker_threads = match (flavor, self.worker_threads) { + (CurrentThread, Some((_, worker_threads_span))) => { + return Err(syn::Error::new( + worker_threads_span, + "The `worker_threads` option requires the `multi_thread` runtime flavor.", + )) + } + (CurrentThread, None) => None, + (Threaded, worker_threads) if self.rt_multi_thread_available => { + worker_threads.map(|(val, _span)| val) + } (Threaded, _) => { let msg = if self.flavor.is_none() { "The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled." } else { "The runtime flavor `multi_thread` requires the `rt-multi-thread` feature." }; - Err(syn::Error::new(Span::call_site(), msg)) + return Err(syn::Error::new(Span::call_site(), msg)); } - } + }; + + let start_paused = match (flavor, self.start_paused) { + (Threaded, Some((_, start_paused_span))) => { + return Err(syn::Error::new( + start_paused_span, + "The `start_paused` option requires the `current_thread` runtime flavor.", + )); + } + (CurrentThread, Some((start_paused, _))) => Some(start_paused), + (_, None) => None, + }; + + Ok(FinalConfig { + flavor, + worker_threads, + start_paused, + }) } } @@ -134,6 +163,16 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result Result { + match bool { + syn::Lit::Bool(b) => Ok(b.value), + _ => Err(syn::Error::new( + span, + format!("Failed to parse {} as bool.", field), + )), + } +} + fn parse_knobs( mut input: syn::ItemFn, args: syn::AttributeArgs, @@ -174,6 +213,9 @@ fn parse_knobs( "flavor" => { config.set_flavor(namevalue.lit.clone(), namevalue.span())?; } + "start_paused" => { + config.set_start_paused(namevalue.lit.clone(), namevalue.span())?; + } "core_threads" => { let msg = "Attribute `core_threads` is renamed to `worker_threads`"; return Err(syn::Error::new_spanned(namevalue, msg)); @@ -204,11 +246,11 @@ fn parse_knobs( macro_name ) } - "flavor" | "worker_threads" => { + "flavor" | "worker_threads" | "start_paused" => { format!("The `{}` attribute requires an argument.", name) } name => { - format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`", name) + format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`", name) } }; return Err(syn::Error::new_spanned(path, msg)); @@ -235,6 +277,9 @@ fn parse_knobs( if let Some(v) = config.worker_threads { rt = quote! { #rt.worker_threads(#v) }; } + if let Some(v) = config.start_paused { + rt = quote! { #rt.start_paused(#v) }; + } let header = { if is_test { diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index b48bd004967..1c8e29282d4 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -144,6 +144,30 @@ use proc_macro::TokenStream; /// } /// ``` /// +/// ### Configure the runtime to start with time paused +/// +/// ```rust +/// #[tokio::main(flavor = "current_thread", start_paused = true)] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[tokio::main]` +/// +/// ```rust +/// fn main() { +/// tokio::runtime::Builder::new_current_thread() +/// .enable_all() +/// .start_paused(true) +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// /// ### NOTE: /// /// If you rename the Tokio crate in your dependencies this macro will not work. @@ -225,6 +249,15 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// } /// ``` /// +/// ### Configure the runtime to start with time paused +/// +/// ```no_run +/// #[tokio::test(start_paused = true)] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// /// ### NOTE: /// /// If you rename the Tokio crate in your dependencies this macro will not work. diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 1e719a235d0..eff72587194 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" } tokio-stream = { version = "0.1", path = "../tokio-stream" } futures = { version = "0.3.0", features = ["async-await"] } proptest = "0.10.0" +rand = "0.8.0" tempfile = "3.1.0" async-stream = "0.3" diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 1f8892eafc8..e845192977e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -47,6 +47,9 @@ pub struct Builder { /// Whether or not to enable the time driver enable_time: bool, + /// Whether or not the clock should start paused. + start_paused: bool, + /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. @@ -110,6 +113,9 @@ impl Builder { // Time defaults to "off" enable_time: false, + // The clock starts not-paused + start_paused: false, + // Default to lazy auto-detection (one thread per CPU core) worker_threads: None, @@ -386,6 +392,7 @@ impl Builder { }, enable_io: self.enable_io, enable_time: self.enable_time, + start_paused: self.start_paused, } } @@ -489,6 +496,31 @@ cfg_time! { } } +cfg_test_util! { + impl Builder { + /// Controls if the runtime's clock starts paused or advancing. + /// + /// Pausing time requires the current-thread runtime; construction of + /// the runtime will panic otherwise. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_current_thread() + /// .enable_time() + /// .start_paused(true) + /// .build() + /// .unwrap(); + /// ``` + pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { + self.start_paused = start_paused; + self + } + } +} + cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index b89fa4fca79..a0e8e2362b7 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -103,8 +103,8 @@ cfg_time! { pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; - fn create_clock(enable_pausing: bool) -> Clock { - crate::time::Clock::new(enable_pausing) + fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock { + crate::time::Clock::new(enable_pausing, start_paused) } fn create_time_driver( @@ -131,7 +131,7 @@ cfg_not_time! { pub(crate) type Clock = (); pub(crate) type TimeHandle = (); - fn create_clock(_enable_pausing: bool) -> Clock { + fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock { () } @@ -162,13 +162,15 @@ pub(crate) struct Cfg { pub(crate) enable_io: bool, pub(crate) enable_time: bool, pub(crate) enable_pause_time: bool, + pub(crate) start_paused: bool, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; - let clock = create_clock(cfg.enable_pause_time); + let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); + let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, clock.clone()); diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index a62fbe39009..8957800cbb5 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -17,7 +17,7 @@ cfg_not_test_util! { } impl Clock { - pub(crate) fn new(_enable_pausing: bool) -> Clock { + pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock { Clock {} } @@ -78,7 +78,8 @@ cfg_test_util! { /// that depend on time. /// /// Pausing time requires the `current_thread` Tokio runtime. This is the - /// default runtime used by `#[tokio::test]` + /// default runtime used by `#[tokio::test]`. The runtime can be initialized + /// with time in a paused state using the `Builder::start_paused` method. /// /// # Panics /// @@ -149,16 +150,22 @@ cfg_test_util! { impl Clock { /// Return a new `Clock` instance that uses the current execution context's /// source of time. - pub(crate) fn new(enable_pausing: bool) -> Clock { + pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { let now = std::time::Instant::now(); - Clock { + let clock = Clock { inner: Arc::new(Mutex::new(Inner { enable_pausing, base: now, unfrozen: Some(now), })), + }; + + if start_paused { + clock.pause(); } + + clock } pub(crate) fn pause(&self) { diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 9fbc0b3cf96..615307ea572 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -102,8 +102,8 @@ pub(self) struct ClockTime { impl ClockTime { pub(self) fn new(clock: Clock) -> Self { Self { + start_time: clock.now(), clock, - start_time: super::clock::now(), } } diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index cfefed32f72..8ae4a84b442 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) { #[test] fn single_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -72,7 +72,7 @@ fn single_timer() { #[test] fn drop_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -103,7 +103,7 @@ fn drop_timer() { #[test] fn change_waker() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -138,7 +138,7 @@ fn reset_future() { model(|| { let finished_early = Arc::new(AtomicBool::new(false)); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -185,7 +185,7 @@ fn reset_future() { #[test] #[cfg(not(loom))] fn poll_process_levels() { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); @@ -226,7 +226,7 @@ fn poll_process_levels() { fn poll_process_levels_targeted() { let mut context = Context::from_waker(noop_waker_ref()); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 92f67af2f25..b267125b15b 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -24,7 +24,7 @@ cfg_rt! { } cfg_rt_multi_thread! { - pub(crate) use rand::FastRand; + pub(crate) use self::rand::FastRand; mod try_lock; pub(crate) use try_lock::TryLock; @@ -34,7 +34,7 @@ pub(crate) mod trace; #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use rand::thread_rng_n; +pub use self::rand::thread_rng_n; #[cfg(any( feature = "rt", diff --git a/tokio/tests/time_pause.rs b/tokio/tests/time_pause.rs index 49a7677f5c8..bc84ac578d0 100644 --- a/tokio/tests/time_pause.rs +++ b/tokio/tests/time_pause.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use rand::SeedableRng; +use rand::{rngs::StdRng, Rng}; +use tokio::time::{self, Duration, Instant}; use tokio_test::assert_err; #[tokio::test] @@ -31,3 +34,26 @@ async fn pause_time_in_spawn_threads() { assert_err!(t.await); } + +#[test] +fn paused_time_is_deterministic() { + let run_1 = paused_time_stress_run(); + let run_2 = paused_time_stress_run(); + + assert_eq!(run_1, run_2); +} + +#[tokio::main(flavor = "current_thread", start_paused = true)] +async fn paused_time_stress_run() -> Vec { + let mut rng = StdRng::seed_from_u64(1); + + let mut times = vec![]; + let start = Instant::now(); + for _ in 0..10_000 { + let sleep = rng.gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + time::sleep(sleep).await; + times.push(start.elapsed()); + } + + times +}