Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make test-util paused time fully deterministic #3492

Merged
merged 10 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests-build/tests/fail/macros_invalid_input.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ error: the async keyword is missing from the function declaration
4 | fn main_is_not_async() {}
| ^^

error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused`
--> $DIR/macros_invalid_input.rs:6:15
|
6 | #[tokio::main(foo)]
Expand All @@ -28,7 +28,7 @@ error: the test function cannot accept arguments
16 | async fn test_fn_has_args(_x: u8) {}
| ^^^^^^

error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused`
--> $DIR/macros_invalid_input.rs:18:15
|
18 | #[tokio::test(foo)]
Expand Down
79 changes: 62 additions & 17 deletions tokio-macros/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ impl RuntimeFlavor {
struct FinalConfig {
flavor: RuntimeFlavor,
worker_threads: Option<usize>,
start_paused: Option<bool>,
}

struct Configuration {
rt_multi_thread_available: bool,
default_flavor: RuntimeFlavor,
flavor: Option<RuntimeFlavor>,
worker_threads: Option<(usize, Span)>,
start_paused: Option<(bool, Span)>,
}

impl Configuration {
Expand All @@ -44,6 +46,7 @@ impl Configuration {
},
flavor: None,
worker_threads: None,
start_paused: None,
}
}

Expand Down Expand Up @@ -79,31 +82,57 @@ impl Configuration {
Ok(())
}

fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> {
if self.start_paused.is_some() {
return Err(syn::Error::new(span, "`start_paused` set multiple times."));
}

let start_paused = parse_bool(start_paused, span, "start_paused")?;
self.start_paused = Some((start_paused, span));
Ok(())
}

fn build(&self) -> Result<FinalConfig, syn::Error> {
let flavor = self.flavor.unwrap_or(self.default_flavor);
use RuntimeFlavor::*;
match (flavor, self.worker_threads) {
(CurrentThread, Some((_, worker_threads_span))) => Err(syn::Error::new(
worker_threads_span,
"The `worker_threads` option requires the `multi_thread` runtime flavor.",
)),
(CurrentThread, None) => Ok(FinalConfig {
flavor,
worker_threads: None,
}),
(Threaded, worker_threads) if self.rt_multi_thread_available => Ok(FinalConfig {
flavor,
worker_threads: worker_threads.map(|(val, _span)| val),
}),

let worker_threads = match (flavor, self.worker_threads) {
(CurrentThread, Some((_, worker_threads_span))) => {
return Err(syn::Error::new(
worker_threads_span,
"The `worker_threads` option requires the `multi_thread` runtime flavor.",
))
}
(CurrentThread, None) => None,
(Threaded, worker_threads) if self.rt_multi_thread_available => {
worker_threads.map(|(val, _span)| val)
}
(Threaded, _) => {
let msg = if self.flavor.is_none() {
"The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled."
} else {
"The runtime flavor `multi_thread` requires the `rt-multi-thread` feature."
};
Err(syn::Error::new(Span::call_site(), msg))
return Err(syn::Error::new(Span::call_site(), msg));
}
}
};

let start_paused = match (flavor, self.start_paused) {
(Threaded, Some((_, start_paused_span))) => {
return Err(syn::Error::new(
start_paused_span,
"The `start_paused` option requires the `current_thread` runtime flavor.",
));
}
(CurrentThread, Some((start_paused, _))) => Some(start_paused),
(_, None) => None,
};

Ok(FinalConfig {
flavor,
worker_threads,
start_paused,
})
}
}

Expand Down Expand Up @@ -134,6 +163,16 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result<String, syn::E
}
}

fn parse_bool(bool: syn::Lit, span: Span, field: &str) -> Result<bool, syn::Error> {
match bool {
syn::Lit::Bool(b) => Ok(b.value),
_ => Err(syn::Error::new(
span,
format!("Failed to parse {} as bool.", field),
)),
}
}

fn parse_knobs(
mut input: syn::ItemFn,
args: syn::AttributeArgs,
Expand Down Expand Up @@ -174,6 +213,9 @@ fn parse_knobs(
"flavor" => {
config.set_flavor(namevalue.lit.clone(), namevalue.span())?;
}
"start_paused" => {
config.set_start_paused(namevalue.lit.clone(), namevalue.span())?;
}
Comment on lines +216 to +218
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new attribute should be mentioned in the catch-all a few lines further down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"core_threads" => {
let msg = "Attribute `core_threads` is renamed to `worker_threads`";
return Err(syn::Error::new_spanned(namevalue, msg));
Expand Down Expand Up @@ -204,11 +246,11 @@ fn parse_knobs(
macro_name
)
}
"flavor" | "worker_threads" => {
"flavor" | "worker_threads" | "start_paused" => {
format!("The `{}` attribute requires an argument.", name)
}
name => {
format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`", name)
format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`", name)
}
};
return Err(syn::Error::new_spanned(path, msg));
Expand All @@ -235,6 +277,9 @@ fn parse_knobs(
if let Some(v) = config.worker_threads {
rt = quote! { #rt.worker_threads(#v) };
}
if let Some(v) = config.start_paused {
rt = quote! { #rt.start_paused(#v) };
}
Comment on lines +280 to +282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since making the test deterministic requires calling it on the builder, I agree that it should be added to the macro, but we should document it on both #[tokio::test] and tokio::time::pause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +280 to +282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this will fail to compile if used without the test-util feature, but making it detect that it the feature missing is somewhat cumbersome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - it looks like there's some detection for the multi-threaded runtime but it involves 2 macros and cfg'd imports in tokio which I don't think can easily scale to multiple features.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't personally feel super strongly about supporting this in the macros, fwiw. I'd be fine leaving it out and adjusting the test to manually make the runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just do e.g.:

if let Some(v) = config.start_paused {
  #[cfg(not(feature="test-util"))]
  {
    return Err(syn::Error::new_spanned(v.1, "start_paused requires feature test-util"));
  }
  rt = quote! { #rt.start_paused(#v) };
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a memory with there being some sort of issue with forwarding the features to the tokio-macros crate (recall, it is a separate crate), but I'm not sure what it was.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfackler what do you think about @bdonlan’s suggestion? @Darksonn wonders if it would work, it is worth a shot imo!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll give it a try tonight. If it works, we could remove the main_rt and test_rt variants I think as well. Are those considered part of the crate's public API?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us just keep the others for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - the problem with that approach is that Cargo has no way currently to say "enable this feature on an optional dependency, but don't activate that dependency". So if we try to propagate features into tokio-macros, turning on any of those features in tokio automatically pulls in tokio-macros.


let header = {
if is_test {
Expand Down
33 changes: 33 additions & 0 deletions tokio-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ use proc_macro::TokenStream;
/// }
/// ```
///
/// ### Configure the runtime to start with time paused
///
/// ```rust
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// println!("Hello world");
/// }
/// ```
///
/// Equivalent code not using `#[tokio::main]`
///
/// ```rust
/// fn main() {
/// tokio::runtime::Builder::new_current_thread()
/// .enable_all()
/// .start_paused(true)
/// .build()
/// .unwrap()
/// .block_on(async {
/// println!("Hello world");
/// })
/// }
/// ```
///
/// ### NOTE:
///
/// If you rename the Tokio crate in your dependencies this macro will not work.
Expand Down Expand Up @@ -225,6 +249,15 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream {
/// }
/// ```
///
/// ### Configure the runtime to start with time paused
///
/// ```no_run
/// #[tokio::test(start_paused = true)]
/// async fn my_test() {
/// assert!(true);
/// }
/// ```
///
/// ### NOTE:
///
/// If you rename the Tokio crate in your dependencies this macro will not work.
Expand Down
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
futures = { version = "0.3.0", features = ["async-await"] }
proptest = "0.10.0"
rand = "0.8.0"
tempfile = "3.1.0"
async-stream = "0.3"

Expand Down
32 changes: 32 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub struct Builder {
/// Whether or not to enable the time driver
enable_time: bool,

/// Whether or not the clock should start paused.
start_paused: bool,

/// The number of worker threads, used by Runtime.
///
/// Only used when not using the current-thread executor.
Expand Down Expand Up @@ -110,6 +113,9 @@ impl Builder {
// Time defaults to "off"
enable_time: false,

// The clock starts not-paused
start_paused: false,

// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,

Expand Down Expand Up @@ -386,6 +392,7 @@ impl Builder {
},
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
}
}

Expand Down Expand Up @@ -489,6 +496,31 @@ cfg_time! {
}
}

cfg_test_util! {
impl Builder {
/// Controls if the runtime's clock starts paused or advancing.
///
/// Pausing time requires the current-thread runtime; construction of
/// the runtime will panic otherwise.
Comment on lines +503 to +504
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to clock.pause in Clock::new will panic when running on a multi threaded runtime.

///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_current_thread()
/// .enable_time()
/// .start_paused(true)
/// .build()
/// .unwrap();
/// ```
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
self.start_paused = start_paused;
self
}
}
}

cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
Expand Down
10 changes: 6 additions & 4 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ cfg_time! {
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;

fn create_clock(enable_pausing: bool) -> Clock {
crate::time::Clock::new(enable_pausing)
fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
crate::time::Clock::new(enable_pausing, start_paused)
}

fn create_time_driver(
Expand All @@ -131,7 +131,7 @@ cfg_not_time! {
pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();

fn create_clock(_enable_pausing: bool) -> Clock {
fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
()
}

Expand Down Expand Up @@ -162,13 +162,15 @@ pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
}

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;

let clock = create_clock(cfg.enable_pause_time);
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());

Expand Down
15 changes: 11 additions & 4 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cfg_not_test_util! {
}

impl Clock {
pub(crate) fn new(_enable_pausing: bool) -> Clock {
pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock {
Clock {}
}

Expand Down Expand Up @@ -78,7 +78,8 @@ cfg_test_util! {
/// that depend on time.
///
/// Pausing time requires the `current_thread` Tokio runtime. This is the
/// default runtime used by `#[tokio::test]`
/// default runtime used by `#[tokio::test]`. The runtime can be initialized
/// with time in a paused state using the `Builder::start_paused` method.
///
/// # Panics
///
Expand Down Expand Up @@ -149,16 +150,22 @@ cfg_test_util! {
impl Clock {
/// Return a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new(enable_pausing: bool) -> Clock {
pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
let now = std::time::Instant::now();

Clock {
let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
})),
};

if start_paused {
clock.pause();
}

clock
}

pub(crate) fn pause(&self) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ pub(self) struct ClockTime {
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
start_time: clock.now(),
clock,
start_time: super::clock::now(),
}
}

Expand Down
Loading