Skip to content

Commit

Permalink
Fix async wake behavior
Browse files Browse the repository at this point in the history
If wake() is called while we are in the middle of polling a future, then
we should immediately call our continuation function to schedule another
poll.
  • Loading branch information
bendk committed Oct 9, 2023
1 parent 14547ae commit 6e2a088
Showing 1 changed file with 75 additions and 21 deletions.
96 changes: 75 additions & 21 deletions uniffi_core/src/ffi/rustfuture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,18 @@ pub unsafe fn rust_future_free<ReturnType>(handle: RustFutureHandle) {
/// foreign continuation callback. This enables us to uphold the [rust_future_poll] guarantee.
///
/// [ContinuationDataCell] also tracks cancellation, which is closely tied to continuation data.
#[derive(Debug)]
enum ContinuationDataCell {
/// No continuations set, neither wake() nor cancel() called.
Empty,
/// `wake()` was called when there was no continuation set. The next time `store` is called,
/// the continuation should be immediately invoked with `RustFuturePoll::MaybeReady`
Waked,
/// The future has been cancelled, any future `store` calls should immediately result in the
/// continuation being called with `RustFuturePoll::Ready`.
Cancelled,
/// Continuation data set, the next time `wake()` is called is called, we should invoke the
/// continuation with it.
Set(*const ()),
}

Expand All @@ -235,33 +244,41 @@ impl ContinuationDataCell {
Self::Empty
}

/// Store new continuation data
/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
fn store(&mut self, data: *const ()) {
// If we're cancelled, then call the continuation immediately rather than storing it
if matches!(self, Self::Cancelled) {
call_continuation(data, RustFuturePoll::Ready);
return;
}

match mem::replace(self, Self::Set(data)) {
Self::Empty => (),
Self::Cancelled => unreachable!(),
match self {
Self::Empty => *self = Self::Set(data),
Self::Set(old_data) => {
log::error!(
"store: observed Self::Set state, is poll() being called from multiple threads at once?"
"store: observed `Self::Set` state. Is poll() being called from multiple threads at once?"
);
call_continuation(old_data, RustFuturePoll::Ready);
call_continuation(*old_data, RustFuturePoll::Ready);
*self = Self::Set(data);
}
Self::Waked => {
*self = Self::Empty;
call_continuation(data, RustFuturePoll::MaybeReady);
}
Self::Cancelled => {
call_continuation(data, RustFuturePoll::Ready);
}
}
}

fn send(&mut self) {
if matches!(self, Self::Cancelled) {
return;
}

if let Self::Set(old_data) = mem::replace(self, Self::Empty) {
call_continuation(old_data, RustFuturePoll::MaybeReady);
fn wake(&mut self) {
match self {
// If we had a continuation set, then call it and transition to the `Empty` state.
Self::Set(old_data) => {
let old_data = *old_data;
*self = Self::Empty;
call_continuation(old_data, RustFuturePoll::MaybeReady);
}
// If we were in the `Empty` state, then transition to `Waked`. The next time `store`
// is called, we will immediately call the continuation.
Self::Empty => *self = Self::Waked,
// This is a no-op if we were in the `Cancelled` or `Waked` state.
_ => (),
}
}

Expand Down Expand Up @@ -434,7 +451,7 @@ where
}

fn wake(&self) {
self.continuation_data.lock().unwrap().send();
self.continuation_data.lock().unwrap().wake();
}

fn cancel(&self) {
Expand Down Expand Up @@ -517,7 +534,10 @@ mod tests {
use super::*;
use crate::{test_util::TestError, Lift, RustBuffer, RustCallStatusCode};
use once_cell::sync::OnceCell;
use std::task::Waker;
use std::{
sync::atomic::{AtomicBool, Ordering},
task::Waker,
};

// Sender/Receiver pair that we use for testing
struct Channel {
Expand Down Expand Up @@ -710,4 +730,38 @@ mod tests {
rust_future.ffi_free();
assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready));
}

// Test what happens if we see a `wake()` call while we're polling the future. This can
// happen, for example, with futures that are handled by a tokio thread pool. We should
// schedule another poll of the future in this case.
#[test]
fn test_wake_during_poll() {
setup_continuation_callback();
let first_time = AtomicBool::new(true);
let future = std::future::poll_fn(move |ctx| {
if first_time.load(Ordering::Relaxed) {
first_time.store(false, Ordering::Relaxed);
// Wake the future while we are in the middle of polling it
ctx.waker().clone().wake();
Poll::Pending
} else {
// The second time we're polled' we're ready
Poll::Ready("All done".to_owned())
}
});
let rust_future: Arc<dyn RustFutureFfi<RustBuffer>> =
RustFuture::new(future, crate::UniFfiTag);
let continuation_result = poll(&rust_future);
// The continuation function should called immediately
assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady));
// A second poll should finish the future
let continuation_result = poll(&rust_future);
assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready));
let (return_buf, call_status) = complete(rust_future);
assert_eq!(call_status.code, RustCallStatusCode::Success);
assert_eq!(
<String as Lift<crate::UniFfiTag>>::try_lift(return_buf).unwrap(),
"All done"
);
}
}

0 comments on commit 6e2a088

Please sign in to comment.