Skip to content

Commit

Permalink
rt: remove Arc from Clock (#5434)
Browse files Browse the repository at this point in the history
This patch removes `Arc` from Tokio's internal clock source. Instead of
cloning `Clock` when needed, a reference is passed into functions that
need to get the current instant.
  • Loading branch information
carllerche authored Feb 7, 2023
1 parent a7945b4 commit abf5d28
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 62 deletions.
13 changes: 5 additions & 8 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ impl Driver {

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());
let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock);

Ok((
Self { inner: time_driver },
Expand Down Expand Up @@ -111,10 +110,8 @@ impl Handle {
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}

cfg_test_util! {
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
}
}
Expand Down Expand Up @@ -289,7 +286,7 @@ cfg_time! {
fn create_time_driver(
enable: bool,
io_stack: IoStack,
clock: Clock,
clock: &Clock,
) -> (TimeDriver, TimeHandle) {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
Expand Down Expand Up @@ -337,7 +334,7 @@ cfg_not_time! {
fn create_time_driver(
_enable: bool,
io_stack: IoStack,
_clock: Clock,
_clock: &Clock,
) -> (TimeDriver, TimeHandle) {
(io_stack, ())
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,11 @@ impl TimerEntry {
pub(crate) fn driver(&self) -> &super::Handle {
self.driver.driver().time()
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn clock(&self) -> &super::Clock {
self.driver.driver().clock()
}
}

impl TimerHandle {
Expand Down
16 changes: 9 additions & 7 deletions tokio/src/runtime/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Driver {
/// thread and `time_source` to get the current time and convert to ticks.
///
/// Specifying the source of time is useful when testing.
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

let handle = Handle {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Driver {

match next_wake {
Some(when) => {
let now = handle.time_source.now();
let now = handle.time_source.now(rt_handle.clock());
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
Expand Down Expand Up @@ -214,13 +214,13 @@ impl Driver {
}

// Process pending timers after waking up
handle.process();
handle.process(rt_handle.clock());
}

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
let clock = rt_handle.clock();

if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand All @@ -231,7 +231,9 @@ impl Driver {
// advance the clock.
if !handle.did_wake() {
// Simulate advancing time
clock.advance(duration);
if let Err(msg) = clock.advance(duration) {
panic!("{}", msg);
}
}
} else {
self.park.park_timeout(rt_handle, duration);
Expand All @@ -248,8 +250,8 @@ impl Driver {

impl Handle {
/// Runs timer related logic, and returns the next wakeup time
pub(self) fn process(&self) {
let now = self.time_source().now();
pub(self) fn process(&self, clock: &Clock) {
let now = self.time_source().now(clock);

self.process_at_time(now)
}
Expand Down
8 changes: 3 additions & 5 deletions tokio/src/runtime/time/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ use std::convert::TryInto;
/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug)]
pub(crate) struct TimeSource {
pub(crate) clock: Clock,
start_time: Instant,
}

impl TimeSource {
pub(crate) fn new(clock: Clock) -> Self {
pub(crate) fn new(clock: &Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}

Expand All @@ -36,7 +34,7 @@ impl TimeSource {
Duration::from_millis(t)
}

pub(crate) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
pub(crate) fn now(&self, clock: &Clock) -> u64 {
self.instant_to_tick(clock.now())
}
}
15 changes: 9 additions & 6 deletions tokio/src/runtime/time/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ fn single_timer() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// This may or may not return Some (depending on how it races with the
// thread). If it does return None, however, the timer should complete
// synchronously.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down Expand Up @@ -97,10 +98,11 @@ fn drop_timer() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// advance 2s in the future.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down Expand Up @@ -132,10 +134,11 @@ fn change_waker() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// advance 2s
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down
103 changes: 69 additions & 34 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,40 @@ cfg_not_test_util! {

cfg_test_util! {
use crate::time::{Duration, Instant};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;

cfg_rt! {
fn clock() -> Option<Clock> {
#[track_caller]
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R {
use crate::runtime::Handle;

match Handle::try_current() {
Ok(handle) => Some(handle.inner.driver().clock().clone()),
Err(ref e) if e.is_missing_context() => None,
let res = match Handle::try_current() {
Ok(handle) => f(Some(handle.inner.driver().clock())),
Err(ref e) if e.is_missing_context() => f(None),
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
};

match res {
Ok(ret) => ret,
Err(msg) => panic!("{}", msg),
}
}
}

cfg_not_rt! {
fn clock() -> Option<Clock> {
None
#[track_caller]
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R {
match f(None) {
Ok(ret) => ret,
Err(msg) => panic!("{}", msg),
}
}
}

/// A handle to a source of time.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) struct Clock {
inner: Arc<Mutex<Inner>>,
inner: Mutex<Inner>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -107,8 +117,12 @@ cfg_test_util! {
/// [`advance`]: crate::time::advance
#[track_caller]
pub fn pause() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
with_clock(|maybe_clock| {
match maybe_clock {
Some(clock) => clock.pause(),
None => Err("time cannot be frozen from outside the Tokio runtime"),
}
})
}

/// Resumes time.
Expand All @@ -122,14 +136,21 @@ cfg_test_util! {
/// runtime.
#[track_caller]
pub fn resume() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();
with_clock(|maybe_clock| {
let clock = match maybe_clock {
Some(clock) => clock,
None => return Err("time cannot be frozen from outside the Tokio runtime"),
};

if inner.unfrozen.is_some() {
panic!("time is not frozen");
}
let mut inner = clock.inner.lock();

inner.unfrozen = Some(std::time::Instant::now());
if inner.unfrozen.is_some() {
return Err("time is not frozen");
}

inner.unfrozen = Some(std::time::Instant::now());
Ok(())
})
}

/// Advances time.
Expand Down Expand Up @@ -164,19 +185,27 @@ cfg_test_util! {
///
/// [`sleep`]: fn@crate::time::sleep
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
with_clock(|maybe_clock| {
let clock = match maybe_clock {
Some(clock) => clock,
None => return Err("time cannot be frozen from outside the Tokio runtime"),
};

clock.advance(duration)
});

crate::task::yield_now().await;
}

/// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if let Some(clock) = clock() {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
}
with_clock(|maybe_clock| {
Ok(if let Some(clock) = maybe_clock {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
})
})
}

impl Clock {
Expand All @@ -186,34 +215,40 @@ cfg_test_util! {
let now = std::time::Instant::now();

let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
inner: Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
auto_advance_inhibit_count: 0,
})),
}),
};

if start_paused {
clock.pause();
if let Err(msg) = clock.pause() {
panic!("{}", msg);
}
}

clock
}

#[track_caller]
pub(crate) fn pause(&self) {
pub(crate) fn pause(&self) -> Result<(), &'static str> {
let mut inner = self.inner.lock();

if !inner.enable_pausing {
drop(inner); // avoid poisoning the lock
panic!("`time::pause()` requires the `current_thread` Tokio runtime. \
return Err("`time::pause()` requires the `current_thread` Tokio runtime. \
This is the default Runtime used by `#[tokio::test].");
}

let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed();
let elapsed = match inner.unfrozen.as_ref() {
Some(v) => v.elapsed(),
None => return Err("time is already frozen")
};
inner.base += elapsed;
inner.unfrozen = None;

Ok(())
}

/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
Expand All @@ -232,15 +267,15 @@ cfg_test_util! {
inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0
}

#[track_caller]
pub(crate) fn advance(&self, duration: Duration) {
pub(crate) fn advance(&self, duration: Duration) -> Result<(), &'static str> {
let mut inner = self.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
return Err("time is not frozen");
}

inner.base += duration;
Ok(())
}

pub(crate) fn now(&self) -> Instant {
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ impl Sleep {

#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let clock = handle.driver().clock();
let handle = &handle.driver().time();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());
let duration = deadline_tick.saturating_sub(time_source.now(clock));

let location = location.expect("should have location if tracing");
let resource_span = tracing::trace_span!(
Expand Down Expand Up @@ -370,8 +371,9 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let clock = me.entry.clock();
let time_source = me.entry.driver().time_source();
let now = time_source.now();
let now = time_source.now(clock);
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
};
Expand Down

0 comments on commit abf5d28

Please sign in to comment.