From 3b67140adec8a76515aa3bfbabe52695b76f56d6 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Tue, 27 Nov 2018 18:52:03 +0100 Subject: [PATCH] Remove StreamObj, caution against using FutureObj --- futures-core/src/future/future_obj.rs | 21 +-- futures-core/src/future/mod.rs | 5 + futures-core/src/stream/mod.rs | 6 +- futures-core/src/stream/stream_obj.rs | 258 -------------------------- futures-util/src/future/mod.rs | 6 +- futures/src/lib.rs | 7 +- futures/tests/future_obj.rs | 4 +- futures/tests/futures_ordered.rs | 10 +- futures/tests/futures_unordered.rs | 15 +- futures/tests/recurse.rs | 8 +- 10 files changed, 47 insertions(+), 293 deletions(-) delete mode 100644 futures-core/src/stream/stream_obj.rs diff --git a/futures-core/src/future/future_obj.rs b/futures-core/src/future/future_obj.rs index 027b8fd02d..d76770a2b9 100644 --- a/futures-core/src/future/future_obj.rs +++ b/futures-core/src/future/future_obj.rs @@ -9,9 +9,9 @@ use core::{ /// A custom trait object for polling futures, roughly akin to /// `Box + 'a>`. /// -/// This custom trait object was introduced for two reasons: -/// - Currently it is not possible to take `dyn Trait` by value and -/// `Box` is not available in no_std contexts. +/// This custom trait object was introduced as currently it is not possible to +/// take `dyn Trait` by value and `Box` is not available in no_std +/// contexts. pub struct LocalFutureObj<'a, T> { ptr: *mut (), poll_fn: unsafe fn(*mut (), &mut Context<'_>) -> Poll, @@ -79,14 +79,13 @@ impl Drop for LocalFutureObj<'_, T> { /// A custom trait object for polling futures, roughly akin to /// `Box + Send + 'a>`. /// -/// This custom trait object was introduced for two reasons: -/// - Currently it is not possible to take `dyn Trait` by value and -/// `Box` is not available in no_std contexts. -/// - The `Future` trait is currently not object safe: The `Future::poll` -/// method makes uses the arbitrary self types feature and traits in which -/// this feature is used are currently not object safe due to current compiler -/// limitations. (See tracking issue for arbitrary self types for more -/// information #44874) +/// This custom trait object was introduced as currently it is not possible to +/// take `dyn Trait` by value and `Box` is not available in no_std +/// contexts. +/// +/// You should generally not need to use this type outside of `no_std` or when +/// implementing `Spawn`, consider using [`BoxFuture`](crate::future::BoxFuture) +/// instead. pub struct FutureObj<'a, T>(LocalFutureObj<'a, T>); impl Unpin for FutureObj<'_, T> {} diff --git a/futures-core/src/future/mod.rs b/futures-core/src/future/mod.rs index e039b3960b..540faf41b2 100644 --- a/futures-core/src/future/mod.rs +++ b/futures-core/src/future/mod.rs @@ -9,6 +9,11 @@ pub use core::future::Future; mod future_obj; pub use self::future_obj::{FutureObj, LocalFutureObj, UnsafeFutureObj}; +#[cfg(feature = "alloc")] +/// An owned dynamically typed [`Future`] for use in cases where you can't +/// statically type your result or need to add some indirection. +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + /// A `Future` or `TryFuture` which tracks whether or not the underlying future /// should no longer be polled. /// diff --git a/futures-core/src/stream/mod.rs b/futures-core/src/stream/mod.rs index 54cb56033a..c4ac7afa08 100644 --- a/futures-core/src/stream/mod.rs +++ b/futures-core/src/stream/mod.rs @@ -4,8 +4,10 @@ use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -mod stream_obj; -pub use self::stream_obj::{StreamObj,LocalStreamObj,UnsafeStreamObj}; +#[cfg(feature = "alloc")] +/// An owned dynamically typed [`Stream`] for use in cases where you can't +/// statically type your result or need to add some indirection. +pub type BoxStream<'a, T> = Pin + Send + 'a>>; /// A stream of values produced asynchronously. /// diff --git a/futures-core/src/stream/stream_obj.rs b/futures-core/src/stream/stream_obj.rs deleted file mode 100644 index 05719260ba..0000000000 --- a/futures-core/src/stream/stream_obj.rs +++ /dev/null @@ -1,258 +0,0 @@ -use super::Stream; -use core::fmt; -use core::marker::PhantomData; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// A custom trait object for polling streams, roughly akin to -/// `Box + 'a>`. -/// -/// This custom trait object was introduced for two reasons: -/// - Currently it is not possible to take `dyn Trait` by value and -/// `Box` is not available in no_std contexts. -pub struct LocalStreamObj<'a, T> { - ptr: *mut (), - poll_next_fn: unsafe fn(*mut (), &mut Context<'_>) -> Poll>, - drop_fn: unsafe fn(*mut ()), - _marker: PhantomData<&'a ()>, -} - -impl Unpin for LocalStreamObj<'_, T> {} - -impl<'a, T> LocalStreamObj<'a, T> { - /// Create a `LocalStreamObj` from a custom trait object representation. - #[inline] - pub fn new + 'a>(f: F) -> LocalStreamObj<'a, T> { - LocalStreamObj { - ptr: f.into_raw(), - poll_next_fn: F::poll_next, - drop_fn: F::drop, - _marker: PhantomData, - } - } - - /// Converts the `LocalStreamObj` into a `StreamObj` - /// To make this operation safe one has to ensure that the `UnsafeStreamObj` - /// instance from which this `LocalStreamObj` was created actually - /// implements `Send`. - #[inline] - pub unsafe fn into_stream_obj(self) -> StreamObj<'a, T> { - StreamObj(self) - } -} - -impl fmt::Debug for LocalStreamObj<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LocalStreamObj").finish() - } -} - -impl<'a, T> From> for LocalStreamObj<'a, T> { - #[inline] - fn from(f: StreamObj<'a, T>) -> LocalStreamObj<'a, T> { - f.0 - } -} - -impl Stream for LocalStreamObj<'_, T> { - type Item = T; - - #[inline] - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - unsafe { (self.poll_next_fn)(self.ptr, cx) } - } -} - -impl Drop for LocalStreamObj<'_, T> { - fn drop(&mut self) { - unsafe { (self.drop_fn)(self.ptr) } - } -} - -/// A custom trait object for polling streams, roughly akin to -/// `Box + Send + 'a>`. -/// -/// This custom trait object was introduced for two reasons: -/// - Currently it is not possible to take `dyn Trait` by value and -/// `Box` is not available in no_std contexts. -/// - The `Stream` trait is currently not object safe: The `Stream::poll_next` -/// method makes uses the arbitrary self types feature and traits in which -/// this feature is used are currently not object safe due to current compiler -/// limitations. (See tracking issue for arbitray self types for more -/// information #44874) -pub struct StreamObj<'a, T>(LocalStreamObj<'a, T>); - -impl Unpin for StreamObj<'_, T> {} -unsafe impl Send for StreamObj<'_, T> {} - -impl<'a, T> StreamObj<'a, T> { - /// Create a `StreamObj` from a custom trait object representation. - #[inline] - pub fn new + Send>(f: F) -> StreamObj<'a, T> { - StreamObj(LocalStreamObj::new(f)) - } -} - -impl fmt::Debug for StreamObj<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StreamObj").finish() - } -} - -impl Stream for StreamObj<'_, T> { - type Item = T; - - #[inline] - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }; - pinned_field.poll_next(cx) - } -} - -/// A custom implementation of a stream trait object for `StreamObj`, providing -/// a hand-rolled vtable. -/// -/// This custom representation is typically used only in `no_std` contexts, -/// where the default `Box`-based implementation is not available. -/// -/// The implementor must guarantee that it is safe to call `poll_next` -/// repeatedly (in a non-concurrent fashion) with the result of `into_raw` until -/// `drop` is called. -pub unsafe trait UnsafeStreamObj<'a, T>: 'a { - /// Convert an owned instance into a (conceptually owned) void pointer. - fn into_raw(self) -> *mut (); - - /// Poll the stream represented by the given void pointer. - /// - /// # Safety - /// - /// The trait implementor must guarantee that it is safe to repeatedly call - /// `poll_next` with the result of `into_raw` until `drop` is called; such - /// calls are not, however, allowed to race with each other or with calls to - /// `drop`. - unsafe fn poll_next( - ptr: *mut (), - cx: &mut Context<'_>, - ) -> Poll>; - - /// Drops the stream represented by the given void pointer. - /// - /// # Safety - /// - /// The trait implementor must guarantee that it is safe to call this - /// function once per `into_raw` invocation; that call cannot race with - /// other calls to `drop` or `poll_next`. - unsafe fn drop(ptr: *mut ()); -} - -unsafe impl<'a, T, F> UnsafeStreamObj<'a, T> for &'a mut F -where - F: Stream + Unpin + 'a, -{ - fn into_raw(self) -> *mut () { - self as *mut F as *mut () - } - - unsafe fn poll_next( - ptr: *mut (), - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new_unchecked(&mut *(ptr as *mut F)).poll_next(cx) - } - - unsafe fn drop(_ptr: *mut ()) {} -} - -unsafe impl<'a, T, F> UnsafeStreamObj<'a, T> for Pin<&'a mut F> -where - F: Stream + 'a, -{ - fn into_raw(self) -> *mut () { - unsafe { Pin::get_unchecked_mut(self) as *mut F as *mut () } - } - - unsafe fn poll_next( - ptr: *mut (), - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new_unchecked(&mut *(ptr as *mut F)).poll_next(cx) - } - - unsafe fn drop(_ptr: *mut ()) {} -} - -#[cfg(feature = "alloc")] -mod if_alloc { - use super::*; - use core::mem; - use alloc::boxed::Box; - - unsafe impl<'a, T, F> UnsafeStreamObj<'a, T> for Box - where F: Stream + 'a - { - fn into_raw(self) -> *mut () { - Box::into_raw(self) as *mut () - } - - unsafe fn poll_next(ptr: *mut (), cx: &mut Context<'_>) -> Poll> { - let ptr = ptr as *mut F; - let pin: Pin<&mut F> = Pin::new_unchecked(&mut *ptr); - pin.poll_next(cx) - } - - unsafe fn drop(ptr: *mut ()) { - drop(Box::from_raw(ptr as *mut F)) - } - } - - unsafe impl<'a, T, F> UnsafeStreamObj<'a, T> for Pin> - where F: Stream + 'a - { - fn into_raw(mut self) -> *mut () { - let mut_ref: &mut F = unsafe { Pin::get_unchecked_mut(self.as_mut()) }; - let ptr = mut_ref as *mut F as *mut (); - mem::forget(self); // Don't drop the box - ptr - } - - unsafe fn poll_next(ptr: *mut (), cx: &mut Context<'_>) -> Poll> { - let ptr = ptr as *mut F; - let pin: Pin<&mut F> = Pin::new_unchecked(&mut *ptr); - pin.poll_next(cx) - } - - unsafe fn drop(ptr: *mut ()) { - drop(Box::from_raw(ptr as *mut F)) - } - } - - impl<'a, F: Stream + Send + 'a> From>> for StreamObj<'a, ()> { - fn from(boxed: Pin>) -> Self { - StreamObj::new(boxed) - } - } - - impl<'a, F: Stream + Send + 'a> From> for StreamObj<'a, ()> { - fn from(boxed: Box) -> Self { - StreamObj::new(boxed) - } - } - - impl<'a, F: Stream + 'a> From>> for LocalStreamObj<'a, ()> { - fn from(boxed: Pin>) -> Self { - LocalStreamObj::new(boxed) - } - } - - impl<'a, F: Stream + 'a> From> for LocalStreamObj<'a, ()> { - fn from(boxed: Box) -> Self { - LocalStreamObj::new(boxed) - } - } -} diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 4b24468881..b42ba2f556 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -9,6 +9,8 @@ use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "alloc")] use alloc::boxed::Box; +#[cfg(feature = "alloc")] +use futures_core::future::BoxFuture; // re-export for `select!` #[doc(hidden)] @@ -491,8 +493,8 @@ pub trait FutureExt: Future { /// Wrap the future in a Box, pinning it. #[cfg(feature = "alloc")] - fn boxed(self) -> Pin> - where Self: Sized + fn boxed(self) -> BoxFuture<'static, Self::Output> + where Self: Sized + Send + 'static { Box::pin(self) } diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 7e765177d7..005beefeda 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -189,6 +189,9 @@ pub mod future { FutureObj, LocalFutureObj, UnsafeFutureObj, }; + #[cfg(feature = "alloc")] + pub use futures_core::future::BoxFuture; + pub use futures_util::future::{ empty, Empty, lazy, Lazy, @@ -338,9 +341,11 @@ pub mod stream { pub use futures_core::stream::{ Stream, TryStream, FusedStream, - StreamObj, LocalStreamObj, UnsafeStreamObj, }; + #[cfg(feature = "alloc")] + pub use futures_core::stream::BoxStream; + pub use futures_util::stream::{ iter, Iter, repeat, Repeat, diff --git a/futures/tests/future_obj.rs b/futures/tests/future_obj.rs index 85c9fcad51..b914043463 100644 --- a/futures/tests/future_obj.rs +++ b/futures/tests/future_obj.rs @@ -6,7 +6,7 @@ use futures::task::{Context, Poll}; #[test] fn dropping_does_not_segfault() { - FutureObj::new(async { String::new() }.boxed()); + FutureObj::new(Box::new(async { String::new() })); } #[test] @@ -29,7 +29,7 @@ fn dropping_drops_the_future() { } } - FutureObj::new(Inc(&mut times_dropped).boxed()); + FutureObj::new(Box::new(Inc(&mut times_dropped))); assert_eq!(times_dropped, 1); } diff --git a/futures/tests/futures_ordered.rs b/futures/tests/futures_ordered.rs index 61ec590027..c51cb551d3 100644 --- a/futures/tests/futures_ordered.rs +++ b/futures/tests/futures_ordered.rs @@ -2,7 +2,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; -use futures::future::{self, join, FutureExt, FutureObj}; +use futures::future::{self, join, FutureExt}; use futures::stream::{StreamExt, FuturesOrdered}; use futures_test::task::noop_context; @@ -34,8 +34,8 @@ fn works_2() { let (c_tx, c_rx) = oneshot::channel::(); let mut stream = vec![ - FutureObj::new(Box::new(a_rx)), - FutureObj::new(Box::new(join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)))), + a_rx.boxed(), + join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), ].into_iter().collect::>(); let mut cx = noop_context(); @@ -65,10 +65,10 @@ fn queue_never_unblocked() { let (b_tx, b_rx) = oneshot::channel::>(); let (c_tx, c_rx) = oneshot::channel::>(); - let mut stream = futures_ordered(vec![ + let mut stream = vec![ Box::new(a_rx) as Box>, Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box))) as _, - ]); + ].into_iter().collect::>(); with_no_spawn_context(|cx| { for _ in 0..10 { diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index a007dc9a75..dce4fa9130 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -2,13 +2,12 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; -use futures::future::{self, join, FutureExt, FutureObj}; +use futures::future::{self, join, FutureExt}; use futures::stream::{StreamExt, FuturesUnordered}; use futures::task::Poll; use futures_test::{assert_stream_done, assert_stream_next}; use futures_test::future::FutureTestExt; use futures_test::task::noop_context; -use std::boxed::Box; #[test] fn works_1() { @@ -35,8 +34,8 @@ fn works_2() { let (c_tx, c_rx) = oneshot::channel::(); let mut stream = vec![ - FutureObj::new(Box::new(a_rx)), - FutureObj::new(Box::new(join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)))), + a_rx.boxed(), + join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), ].into_iter().collect::>(); a_tx.send(9).unwrap(); @@ -67,10 +66,10 @@ fn finished_future() { let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); - let mut stream = futures_unordered(vec![ - FutureObj::new(Box::new(a_rx)), - //FutureObj::new(Box::new(b_rx.select(c_rx))), - ]); + let mut stream = vec![ + a_rx.boxed(), + b_rx.select(c_rx).boxed(), + ].into_iter().collect::>(); support::with_noop_waker_context(f)(|cx| { for _ in 0..10 { diff --git a/futures/tests/recurse.rs b/futures/tests/recurse.rs index 051ab8c3a0..5c64778c28 100644 --- a/futures/tests/recurse.rs +++ b/futures/tests/recurse.rs @@ -1,18 +1,18 @@ #![feature(futures_api)] use futures::executor::block_on; -use futures::future::{self, FutureExt, FutureObj}; +use futures::future::{self, FutureExt, BoxFuture}; use std::sync::mpsc; use std::thread; #[test] fn lots() { - fn do_it(input: (i32, i32)) -> FutureObj<'static, i32> { + fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> { let (n, x) = input; if n == 0 { - FutureObj::new(Box::new(future::ready(x))) + future::ready(x).boxed() } else { - FutureObj::new(Box::new(future::ready((n - 1, x + n)).then(do_it))) + future::ready((n - 1, x + n)).then(do_it).boxed() } }