Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guarantee that File::write_all writes all data (or at least tries) #4316

Merged
merged 18 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tokio/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
cfg_rt! {
pub(crate) use crate::runtime::spawn_blocking;

cfg_fs! {
#[allow(unused_imports)]
pub(crate) use crate::runtime::spawn_mandatory_blocking;
}

pub(crate) use crate::task::JoinHandle;
}

Expand All @@ -16,7 +22,16 @@ cfg_not_rt! {
{
assert_send_sync::<JoinHandle<std::cell::Cell<()>>>();
panic!("requires the `rt` Tokio feature flag")
}

cfg_fs! {
pub(crate) fn spawn_mandatory_blocking<F, R>(_f: F) -> Option<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
panic!("requires the `rt` Tokio feature flag")
}
}

pub(crate) struct JoinHandle<R> {
Expand Down
17 changes: 11 additions & 6 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ use std::task::Context;
use std::task::Poll;
use std::task::Poll::*;

#[cfg(test)]
use super::mocks::spawn_blocking;
#[cfg(test)]
use super::mocks::JoinHandle;
#[cfg(test)]
use super::mocks::MockFile as StdFile;
#[cfg(not(test))]
use crate::blocking::spawn_blocking;
#[cfg(test)]
use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
#[cfg(not(test))]
use crate::blocking::JoinHandle;
#[cfg(not(test))]
use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
#[cfg(not(test))]
use std::fs::File as StdFile;

/// A reference to an open file on the filesystem.
Expand Down Expand Up @@ -649,15 +649,20 @@ impl AsyncWrite for File {
let n = buf.copy_from(src);
let std = me.std.clone();

inner.state = Busy(spawn_blocking(move || {
let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
}));
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;

inner.state = Busy(blocking_task_join_handle);

return Ready(Ok(n));
}
Expand Down
15 changes: 15 additions & 0 deletions tokio/src/fs/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ where
JoinHandle { rx }
}

pub(super) fn spawn_mandatory_blocking<F, R>(f: F) -> Option<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let task = Box::new(move || {
let _ = tx.send(f());
});

QUEUE.with(|cell| cell.borrow_mut().push_back(task));

Some(JoinHandle { rx })
}

impl<T> Future for JoinHandle<T> {
type Output = Result<T, io::Error>;

Expand Down
6 changes: 5 additions & 1 deletion tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
//! compilation.

mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task};

cfg_fs! {
pub(crate) use pool::spawn_mandatory_blocking;
}

mod schedule;
mod shutdown;
Expand Down
59 changes: 55 additions & 4 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,40 @@ struct Shared {
worker_thread_index: usize,
}

type Task = task::UnownedTask<NoopSchedule>;
pub(crate) struct Task {
task: task::UnownedTask<NoopSchedule>,
mandatory: Mandatory,
}

#[derive(PartialEq, Eq)]
pub(crate) enum Mandatory {
#[cfg_attr(not(fs), allow(dead_code))]
Mandatory,
NonMandatory,
}

impl Task {
pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
}

fn run(self) {
self.task.run();
}

fn shutdown_or_run_if_mandatory(self) {
match self.mandatory {
Mandatory::NonMandatory => self.task.shutdown(),
Mandatory::Mandatory => self.task.run(),
}
}
}

const KEEP_ALIVE: Duration = Duration::from_secs(10);

/// Runs the provided function on an executor dedicated to blocking operations.
/// Tasks will be scheduled as non-mandatory, meaning they may not get executed
/// in case of runtime shutdown.
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
Expand All @@ -84,6 +113,25 @@ where
rt.spawn_blocking(func)
}

cfg_fs! {
#[cfg_attr(any(
all(loom, not(test)), // the function is covered by loom tests
test
), allow(dead_code))]
/// Runs the provided function on an executor dedicated to blocking
/// operations. Tasks will be scheduled as mandatory, meaning they are
/// guaranteed to run unless a shutdown is already taking place. In case a
/// shutdown is already taking place, `None` will be returned.
pub(crate) fn spawn_mandatory_blocking<F, R>(func: F) -> Option<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = context::current();
rt.spawn_mandatory_blocking(func)
}
}

// ===== impl BlockingPool =====

impl BlockingPool {
Expand Down Expand Up @@ -176,8 +224,10 @@ impl Spawner {
let mut shared = self.inner.shared.lock();

if shared.shutdown {
// Shutdown the task
task.shutdown();
// Shutdown the task: it's fine to shutdown this task (even if
// mandatory) because it was scheduled after the shutdown of the
// runtime began.
task.task.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps include a comment here that it is ok to shut down a mandatory task here without running it because this only happens if the runtime has already started shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think about it, we could still have a race where one worker thread successfully executes:

file.write(b"hello").await?;

but the runtime shutdown is scheduled from another thread, and the payload is never written to the file. The return value of blocking::Spawner::spawn (which conveys whether the task was actually scheduled) gets ignored in https://github.com/tokio-rs/tokio/pull/4316/files#diff-f1d42d2af51248e5dacb1cc3dd6bba12a956abba0d25567ee67bfd68d27e27d5R266

Our test doesn't show this error scenario because the write call and the runtime shutdown happen in the same thread, one after the other.

Perhaps spawn_mandatory_blocking should return an error when it tried to spawn a mandatory task but the runtime was already shutting down. That way, a call to write would only succeed if the write were to be actually attempted at shutdown. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the current version of this PR (+ the comment you suggested) would already fix the original issue (#4296), so I leave it up to you whether you want to pursue the fix to #4316 (comment) in this PR or in a follow-up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's simple to fix here, then go ahead and do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken a stab at fixing the problem.


// no need to even push this task; it would never get picked up
return Err(());
Expand Down Expand Up @@ -302,7 +352,8 @@ impl Inner {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
drop(shared);
task.shutdown();

task.shutdown_or_run_if_mandatory();

shared = self.shared.lock();
}
Expand Down
57 changes: 50 additions & 7 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,56 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
self.spawn_blocking_inner(Box::new(func), None)
} else {
self.spawn_blocking_inner(func, None)
let (join_handle, _was_spawned) =
if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None)
} else {
self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None)
};

join_handle
}

cfg_fs! {
#[track_caller]
#[cfg_attr(any(
all(loom, not(test)), // the function is covered by loom tests
test
), allow(dead_code))]
pub(crate) fn spawn_mandatory_blocking<F, R>(&self, func: F) -> Option<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
self.spawn_blocking_inner(
Box::new(func),
blocking::Mandatory::Mandatory,
None
)
} else {
self.spawn_blocking_inner(
func,
blocking::Mandatory::Mandatory,
None
)
};

if was_spawned {
Some(join_handle)
} else {
None
}
}
}

#[track_caller]
pub(crate) fn spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R>
pub(crate) fn spawn_blocking_inner<F, R>(
&self,
func: F,
is_mandatory: blocking::Mandatory,
name: Option<&str>,
) -> (JoinHandle<R>, bool)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -223,8 +264,10 @@ impl Handle {
let _ = name;

let (task, handle) = task::unowned(fut, NoopSchedule);
let _ = self.blocking_spawner.spawn(task, self);
handle
let spawned = self
.blocking_spawner
.spawn(blocking::Task::new(task, is_mandatory), self);
(handle, spawned.is_ok())
}

/// Runs a future to completion on this `Handle`'s associated `Runtime`.
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ cfg_rt! {
use blocking::BlockingPool;
pub(crate) use blocking::spawn_blocking;

cfg_trace! {
pub(crate) use blocking::Mandatory;
}

cfg_fs! {
pub(crate) use blocking::spawn_mandatory_blocking;
}

mod builder;
pub use self::builder::Builder;

Expand Down
50 changes: 50 additions & 0 deletions tokio/src/runtime/tests/loom_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,56 @@ fn blocking_shutdown() {
});
}

#[test]
fn spawn_mandatory_blocking_should_always_run() {
use crate::runtime::tests::loom_oneshot;
loom::model(|| {
let rt = runtime::Builder::new_current_thread().build().unwrap();

let (tx, rx) = loom_oneshot::channel();
let _enter = rt.enter();
runtime::spawn_blocking(|| {});
runtime::spawn_mandatory_blocking(move || {
let _ = tx.send(());
})
.unwrap();

drop(rt);

// This call will deadlock if `spawn_mandatory_blocking` doesn't run.
let () = rx.recv();
});
}

#[test]
fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread() {
use crate::runtime::tests::loom_oneshot;
loom::model(|| {
let rt = runtime::Builder::new_current_thread().build().unwrap();
let handle = rt.handle().clone();

// Drop the runtime in a different thread
{
loom::thread::spawn(move || {
drop(rt);
});
}

let _enter = handle.enter();
let (tx, rx) = loom_oneshot::channel();
let handle = runtime::spawn_mandatory_blocking(move || {
let _ = tx.send(());
});
Comment on lines +62 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this test not do an extra spawn_blocking first like the other test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each test is covering a different race related to shutting down:

  1. The first one makes sure that, if a thread in the blocking pool gets awaken and shutdown has been signaled already, all mandatory tasks will get executed. Reaching this scenario requires putting the initial spawn_blocking_task, otherwise the thread in the blocking pool will not get to check the shutdown flag before executing the mandatory task.
  2. The second one makes sure that, if the runtime had shutdown by the time the spawning thread was spawning the mandatory task, an error will be communicated to the caller. (Or the contrapositive: if calling spawn_mandatory_blocking doesn't err, the task will be executed).

I've checked that both tests are useful by changing the implementations slightly and seeing them fail.

I could also write a test that covers the combination of both races if you want, but it wouldn't add a lot of value IIUC because both races are independent


// handle.is_some() means that `spawn_mandatory_blocking`
// promised us to run the blocking task
if handle.is_some() {
// This call will deadlock if `spawn_mandatory_blocking` doesn't run.
let () = rx.recv();
}
});
}

fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ impl<'a> Builder<'a> {
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
context::current().spawn_blocking_inner(function, self.name)
use crate::runtime::Mandatory;
let (join_handle, _was_spawned) =
context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name);
join_handle
}
}