Skip to content

Commit

Permalink
Merge pull request #1785 from bendk/async-wake-fix
Browse files Browse the repository at this point in the history
Fix async wake behavior
  • Loading branch information
bendk authored Oct 11, 2023
2 parents c890e8b + e9d79bc commit 48b1f51
Showing 1 changed file with 71 additions and 20 deletions.
91 changes: 71 additions & 20 deletions uniffi_core/src/ffi/rustfuture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,18 @@ pub unsafe fn rust_future_free<ReturnType>(handle: RustFutureHandle) {
/// foreign continuation callback. This enables us to uphold the [rust_future_poll] guarantee.
///
/// [ContinuationDataCell] also tracks cancellation, which is closely tied to continuation data.
#[derive(Debug)]
enum ContinuationDataCell {
/// No continuations set, neither wake() nor cancel() called.
Empty,
/// `wake()` was called when there was no continuation set. The next time `store` is called,
/// the continuation should be immediately invoked with `RustFuturePoll::MaybeReady`
Waked,
/// The future has been cancelled, any future `store` calls should immediately result in the
/// continuation being called with `RustFuturePoll::Ready`.
Cancelled,
/// Continuation data set, the next time `wake()` is called is called, we should invoke the
/// continuation with it.
Set(*const ()),
}

Expand All @@ -235,33 +244,41 @@ impl ContinuationDataCell {
Self::Empty
}

/// Store new continuation data
/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
fn store(&mut self, data: *const ()) {
// If we're cancelled, then call the continuation immediately rather than storing it
if matches!(self, Self::Cancelled) {
call_continuation(data, RustFuturePoll::Ready);
return;
}

match mem::replace(self, Self::Set(data)) {
Self::Empty => (),
Self::Cancelled => unreachable!(),
match self {
Self::Empty => *self = Self::Set(data),
Self::Set(old_data) => {
log::error!(
"store: observed Self::Set state, is poll() being called from multiple threads at once?"
"store: observed `Self::Set` state. Is poll() being called from multiple threads at once?"
);
call_continuation(old_data, RustFuturePoll::Ready);
call_continuation(*old_data, RustFuturePoll::Ready);
*self = Self::Set(data);
}
Self::Waked => {
*self = Self::Empty;
call_continuation(data, RustFuturePoll::MaybeReady);
}
Self::Cancelled => {
call_continuation(data, RustFuturePoll::Ready);
}
}
}

fn send(&mut self) {
if matches!(self, Self::Cancelled) {
return;
}

if let Self::Set(old_data) = mem::replace(self, Self::Empty) {
call_continuation(old_data, RustFuturePoll::MaybeReady);
fn wake(&mut self) {
match self {
// If we had a continuation set, then call it and transition to the `Empty` state.
Self::Set(old_data) => {
let old_data = *old_data;
*self = Self::Empty;
call_continuation(old_data, RustFuturePoll::MaybeReady);
}
// If we were in the `Empty` state, then transition to `Waked`. The next time `store`
// is called, we will immediately call the continuation.
Self::Empty => *self = Self::Waked,
// This is a no-op if we were in the `Cancelled` or `Waked` state.
_ => (),
}
}

Expand Down Expand Up @@ -434,7 +451,7 @@ where
}

fn wake(&self) {
self.continuation_data.lock().unwrap().send();
self.continuation_data.lock().unwrap().wake();
}

fn cancel(&self) {
Expand Down Expand Up @@ -710,4 +727,38 @@ mod tests {
rust_future.ffi_free();
assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready));
}

// Test what happens if we see a `wake()` call while we're polling the future. This can
// happen, for example, with futures that are handled by a tokio thread pool. We should
// schedule another poll of the future in this case.
#[test]
fn test_wake_during_poll() {
setup_continuation_callback();
let mut first_time = true;
let future = std::future::poll_fn(move |ctx| {
if first_time {
first_time = false;
// Wake the future while we are in the middle of polling it
ctx.waker().clone().wake();
Poll::Pending
} else {
// The second time we're polled, we're ready
Poll::Ready("All done".to_owned())
}
});
let rust_future: Arc<dyn RustFutureFfi<RustBuffer>> =
RustFuture::new(future, crate::UniFfiTag);
let continuation_result = poll(&rust_future);
// The continuation function should called immediately
assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady));
// A second poll should finish the future
let continuation_result = poll(&rust_future);
assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready));
let (return_buf, call_status) = complete(rust_future);
assert_eq!(call_status.code, RustCallStatusCode::Success);
assert_eq!(
<String as Lift<crate::UniFfiTag>>::try_lift(return_buf).unwrap(),
"All done"
);
}
}

0 comments on commit 48b1f51

Please sign in to comment.