diff --git a/Cargo.toml b/Cargo.toml index 1b6416f07d7..c7b25ff82b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,9 +31,6 @@ unindent = { version = "0.2.1", optional = true } # support crate for multiple-pymethods feature inventory = { version = "0.3.0", optional = true } -# coroutine implementation -futures-util = "0.3" - # crate integrations that can be added using the eponymous features anyhow = { version = "1.0", optional = true } chrono = { version = "0.4.25", default-features = false, optional = true } diff --git a/src/coroutine.rs b/src/coroutine.rs index c4c7bbf29cd..050b0f5c741 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -1,7 +1,7 @@ //! Python coroutine implementation, used notably when wrapping `async fn` //! with `#[pyfunction]`/`#[pymethods]`. +use std::task::Waker; use std::{ - any::Any, future::Future, panic, pin::Pin, @@ -9,13 +9,11 @@ use std::{ task::{Context, Poll}, }; -use futures_util::FutureExt; use pyo3_macros::{pyclass, pymethods}; use crate::{ coroutine::waker::AsyncioWaker, exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration}, - panic::PanicException, pyclass::IterNextOutput, types::{PyIterator, PyString}, IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python, @@ -25,19 +23,18 @@ pub(crate) mod cancel; mod waker; use crate::coroutine::cancel::ThrowCallback; +use crate::panic::PanicException; pub use cancel::CancelHandle; const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine"; -type FutureOutput = Result, Box>; - /// Python coroutine wrapping a [`Future`]. #[pyclass(crate = "crate")] pub struct Coroutine { name: Option>, qualname_prefix: Option<&'static str>, throw_callback: Option, - future: Option + Send>>>, + future: Option> + Send>>>, waker: Option>, } @@ -68,7 +65,7 @@ impl Coroutine { name, qualname_prefix, throw_callback, - future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())), + future: Some(Box::pin(wrap)), waker: None, } } @@ -98,14 +95,20 @@ impl Coroutine { } else { self.waker = Some(Arc::new(AsyncioWaker::new())); } - let waker = futures_util::task::waker(self.waker.clone().unwrap()); + let waker = Waker::from(self.waker.clone().unwrap()); // poll the Rust future and forward its results if ready - if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) { - self.close(); - return match res { - Ok(res) => Ok(IterNextOutput::Return(res?)), - Err(err) => Err(PanicException::from_panic_payload(err)), - }; + // polling is UnwindSafe because the future is dropped in case of panic + let poll = || future_rs.as_mut().poll(&mut Context::from_waker(&waker)); + match panic::catch_unwind(panic::AssertUnwindSafe(poll)) { + Ok(Poll::Ready(res)) => { + self.close(); + return Ok(IterNextOutput::Return(res?)); + } + Err(err) => { + self.close(); + return Err(PanicException::from_panic_payload(err)); + } + _ => {} } // otherwise, initialize the waker `asyncio.Future` if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? { @@ -113,7 +116,7 @@ impl Coroutine { // and will yield itself if its result has not been set in polling above if let Some(future) = PyIterator::from_object(future).unwrap().next() { // future has not been leaked into Python for now, and Rust code can only call - // `set_result(None)` in `ArcWake` implementation, so it's safe to unwrap + // `set_result(None)` in `Wake` implementation, so it's safe to unwrap return Ok(IterNextOutput::Yield(future.unwrap().into())); } } diff --git a/src/coroutine/waker.rs b/src/coroutine/waker.rs index 7ed4103fbb7..8a1166ce3fb 100644 --- a/src/coroutine/waker.rs +++ b/src/coroutine/waker.rs @@ -1,11 +1,11 @@ use crate::sync::GILOnceCell; use crate::types::PyCFunction; use crate::{intern, wrap_pyfunction, Py, PyAny, PyObject, PyResult, Python}; -use futures_util::task::ArcWake; use pyo3_macros::pyfunction; use std::sync::Arc; +use std::task::Wake; -/// Lazy `asyncio.Future` wrapper, implementing [`ArcWake`] by calling `Future.set_result`. +/// Lazy `asyncio.Future` wrapper, implementing [`Wake`] by calling `Future.set_result`. /// /// asyncio future is let uninitialized until [`initialize_future`][1] is called. /// If [`wake`][2] is called before future initialization (during Rust future polling), @@ -31,10 +31,14 @@ impl AsyncioWaker { } } -impl ArcWake for AsyncioWaker { - fn wake_by_ref(arc_self: &Arc) { +impl Wake for AsyncioWaker { + fn wake(self: Arc) { + self.wake_by_ref() + } + + fn wake_by_ref(self: &Arc) { Python::with_gil(|gil| { - if let Some(loop_and_future) = arc_self.0.get_or_init(gil, || None) { + if let Some(loop_and_future) = self.0.get_or_init(gil, || None) { loop_and_future .set_result(gil) .expect("unexpected error in coroutine waker");