diff --git a/Cargo.toml b/Cargo.toml index 430205d8ca..7a6eeee576 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,6 @@ members = [ "futures-util", "futures-test", ] + +[patch.crates-io] +pin-project = { git = "https://github.com/taiki-e/pin-project", branch = "pin_project" } diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index f394b76fe2..814febeaa6 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -40,6 +40,7 @@ slab = { version = "0.4", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } +pin-project = "0.3.2" pin-utils = "0.1.0-alpha.4" [dev-dependencies] diff --git a/futures-util/src/future/catch_unwind.rs b/futures-util/src/future/catch_unwind.rs index 730462b1ef..98a689240d 100644 --- a/futures-util/src/future/catch_unwind.rs +++ b/futures-util/src/future/catch_unwind.rs @@ -1,21 +1,20 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; use std::any::Any; -use std::pin::Pin; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; -use std::prelude::v1::*; +use std::pin::Pin; /// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct CatchUnwind where Fut: Future { + #[pin] future: Fut, } impl CatchUnwind where Fut: Future + UnwindSafe { - unsafe_pinned!(future: Fut); - pub(super) fn new(future: Fut) -> CatchUnwind { CatchUnwind { future } } @@ -26,7 +25,8 @@ impl Future for CatchUnwind { type Output = Result>; + #[pin_project(self)] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok) + catch_unwind(AssertUnwindSafe(|| self.future.poll(cx)))?.map(Ok) } } diff --git a/futures-util/src/future/flatten.rs b/futures-util/src/future/flatten.rs index d051401da6..aaa7c2b854 100644 --- a/futures-util/src/future/flatten.rs +++ b/futures-util/src/future/flatten.rs @@ -3,14 +3,16 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`flatten`](super::FutureExt::flatten) method. +#[unsafe_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Flatten where Fut: Future, Fut::Output: Future, { + #[pin] state: Chain, } @@ -18,8 +20,6 @@ impl Flatten where Fut: Future, Fut::Output: Future, { - unsafe_pinned!(state: Chain); - pub(super) fn new(future: Fut) -> Flatten { Flatten { state: Chain::new(future, ()), @@ -51,7 +51,8 @@ impl Future for Flatten { type Output = ::Output; + #[pin_project(self)] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.state().poll(cx, |a, ()| a) + self.state.poll(cx, |a, ()| a) } } diff --git a/futures-util/src/future/fuse.rs b/futures-util/src/future/fuse.rs index 978cf1cc5d..f47e73e758 100644 --- a/futures-util/src/future/fuse.rs +++ b/futures-util/src/future/fuse.rs @@ -1,18 +1,18 @@ use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`fuse`](super::FutureExt::fuse) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fuse { + #[pin] future: Option, } impl Fuse { - unsafe_pinned!(future: Option); - pub(super) fn new(f: Fut) -> Fuse { Fuse { future: Some(f), @@ -79,13 +79,14 @@ impl FusedFuture for Fuse { impl Future for Fuse { type Output = Fut::Output; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let v = match self.as_mut().future().as_pin_mut() { + let v = match self.future.as_mut().as_pin_mut() { Some(fut) => ready!(fut.poll(cx)), None => return Poll::Pending, }; - self.as_mut().future().set(None); + self.future.set(None); Poll::Ready(v) } } diff --git a/futures-util/src/future/inspect.rs b/futures-util/src/future/inspect.rs index f9e1fc57cc..ff594363cc 100644 --- a/futures-util/src/future/inspect.rs +++ b/futures-util/src/future/inspect.rs @@ -1,20 +1,19 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`inspect`](super::FutureExt::inspect) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Inspect where Fut: Future { + #[pin] future: Fut, f: Option, } impl Inspect { - unsafe_pinned!(future: Fut); - unsafe_unpinned!(f: Option); - pub(super) fn new(future: Fut, f: F) -> Inspect { Inspect { future, @@ -23,8 +22,6 @@ impl Inspect { } } -impl Unpin for Inspect {} - impl FusedFuture for Inspect { fn is_terminated(&self) -> bool { self.future.is_terminated() } } @@ -35,9 +32,10 @@ impl Future for Inspect { type Output = Fut::Output; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let e = ready!(self.as_mut().future().poll(cx)); - let f = self.as_mut().f().take().expect("cannot poll Inspect twice"); + let e = ready!(self.future.as_mut().poll(cx)); + let f = self.f.take().expect("cannot poll Inspect twice"); f(&e); Poll::Ready(e) } diff --git a/futures-util/src/future/into_stream.rs b/futures-util/src/future/into_stream.rs index adc987e6a4..37ad5d678a 100644 --- a/futures-util/src/future/into_stream.rs +++ b/futures-util/src/future/into_stream.rs @@ -2,18 +2,18 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`into_stream`](super::FutureExt::into_stream) method. +#[unsafe_project(Unpin)] #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct IntoStream { + #[pin] future: Option } impl IntoStream { - unsafe_pinned!(future: Option); - pub(super) fn new(future: Fut) -> IntoStream { IntoStream { future: Some(future) @@ -24,13 +24,14 @@ impl IntoStream { impl Stream for IntoStream { type Item = Fut::Output; + #[pin_project(self)] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let v = match self.as_mut().future().as_pin_mut() { + let v = match self.future.as_mut().as_pin_mut() { Some(fut) => ready!(fut.poll(cx)), None => return Poll::Ready(None), }; - self.as_mut().future().set(None); + self.future.set(None); Poll::Ready(Some(v)) } } diff --git a/futures-util/src/future/map.rs b/futures-util/src/future/map.rs index 787890ee3d..928b6dde01 100644 --- a/futures-util/src/future/map.rs +++ b/futures-util/src/future/map.rs @@ -1,28 +1,24 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`map`](super::FutureExt::map) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Map { + #[pin] future: Fut, f: Option, } impl Map { - unsafe_pinned!(future: Fut); - unsafe_unpinned!(f: Option); - - /// Creates a new Map. pub(super) fn new(future: Fut, f: F) -> Map { Map { future, f: Some(f) } } } -impl Unpin for Map {} - impl FusedFuture for Map { fn is_terminated(&self) -> bool { self.f.is_none() } } @@ -33,12 +29,13 @@ impl Future for Map { type Output = T; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut() - .future() + self.future + .as_mut() .poll(cx) .map(|output| { - let f = self.f().take() + let f = self.f.take() .expect("Map must not be polled after it returned `Poll::Ready`"); f(output) }) diff --git a/futures-util/src/future/never_error.rs b/futures-util/src/future/never_error.rs index 8924ec6ceb..745811ee53 100644 --- a/futures-util/src/future/never_error.rs +++ b/futures-util/src/future/never_error.rs @@ -1,25 +1,23 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{self, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`never_error`](super::FutureExt::never_error) combinator. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct NeverError { + #[pin] future: Fut, } impl NeverError { - unsafe_pinned!(future: Fut); - pub(super) fn new(future: Fut) -> NeverError { NeverError { future } } } -impl Unpin for NeverError {} - impl FusedFuture for NeverError { fn is_terminated(&self) -> bool { self.future.is_terminated() } } @@ -29,7 +27,8 @@ impl Future for NeverError { type Output = Result; + #[pin_project(self)] fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.future().poll(cx).map(Ok) + self.future.poll(cx).map(Ok) } } diff --git a/futures-util/src/future/option.rs b/futures-util/src/future/option.rs index 53e9a1e92b..48edc3ca57 100644 --- a/futures-util/src/future/option.rs +++ b/futures-util/src/future/option.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// A future representing a value which may or may not be present. /// @@ -23,24 +23,23 @@ use pin_utils::unsafe_pinned; /// assert_eq!(a.await, None); /// # }); /// ``` +#[unsafe_project(Unpin)] #[derive(Debug, Clone)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct OptionFuture { + #[pin] option: Option, } -impl OptionFuture { - unsafe_pinned!(option: Option); -} - impl Future for OptionFuture { type Output = Option; + #[pin_project(self)] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - match self.option().as_pin_mut() { + match self.option.as_pin_mut() { Some(x) => x.poll(cx).map(Some), None => Poll::Ready(None), } diff --git a/futures-util/src/future/remote_handle.rs b/futures-util/src/future/remote_handle.rs index e3223c6c24..4f6645de9d 100644 --- a/futures-util/src/future/remote_handle.rs +++ b/futures-util/src/future/remote_handle.rs @@ -5,7 +5,7 @@ use { future::Future, task::{Context, Poll}, }, - pin_utils::{unsafe_pinned, unsafe_unpinned}, + pin_project::{pin_project, unsafe_project}, std::{ any::Any, fmt, @@ -54,10 +54,12 @@ type SendMsg = Result<::Output, Box<(dyn Any + Send + 'stati /// A future which sends its output to the corresponding `RemoteHandle`. /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle). +#[unsafe_project(Unpin)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Remote { tx: Option>>, keep_running: Arc, + #[pin] future: CatchUnwind>, } @@ -69,29 +71,23 @@ impl fmt::Debug for Remote { } } -impl Unpin for Remote {} - -impl Remote { - unsafe_pinned!(future: CatchUnwind>); - unsafe_unpinned!(tx: Option>>); -} - impl Future for Remote { type Output = (); + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if let Poll::Ready(_) = self.as_mut().tx().as_mut().unwrap().poll_cancel(cx) { + if let Poll::Ready(_) = self.tx.as_mut().unwrap().poll_cancel(cx) { if !self.keep_running.load(Ordering::SeqCst) { // Cancelled, bail out return Poll::Ready(()) } } - let output = ready!(self.as_mut().future().poll(cx)); + let output = ready!(self.future.as_mut().poll(cx)); // if the receiving end has gone away then that's ok, we just ignore the // send error here. - drop(self.as_mut().tx().take().unwrap().send(output)); + drop(self.tx.take().unwrap().send(output)); Poll::Ready(()) } } diff --git a/futures-util/src/future/then.rs b/futures-util/src/future/then.rs index e03683f2d3..c5eb2adc7f 100644 --- a/futures-util/src/future/then.rs +++ b/futures-util/src/future/then.rs @@ -2,12 +2,14 @@ use super::Chain; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`then`](super::FutureExt::then) method. +#[unsafe_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Then { + #[pin] chain: Chain, } @@ -15,9 +17,6 @@ impl Then where Fut1: Future, Fut2: Future, { - unsafe_pinned!(chain: Chain); - - /// Creates a new `Then`. pub(super) fn new(future: Fut1, f: F) -> Then { Then { chain: Chain::new(future, f), @@ -36,7 +35,8 @@ impl Future for Then { type Output = Fut2::Output; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().chain().poll(cx, |output, f| f(output)) + self.chain.poll(cx, |output, f| f(output)) } } diff --git a/futures-util/src/future/unit_error.rs b/futures-util/src/future/unit_error.rs index 679e988b16..2925eedee3 100644 --- a/futures-util/src/future/unit_error.rs +++ b/futures-util/src/future/unit_error.rs @@ -1,25 +1,23 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`unit_error`](super::FutureExt::unit_error) combinator. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct UnitError { + #[pin] future: Fut, } impl UnitError { - unsafe_pinned!(future: Fut); - pub(super) fn new(future: Fut) -> UnitError { UnitError { future } } } -impl Unpin for UnitError {} - impl FusedFuture for UnitError { fn is_terminated(&self) -> bool { self.future.is_terminated() } } @@ -29,7 +27,8 @@ impl Future for UnitError { type Output = Result; + #[pin_project(self)] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.future().poll(cx).map(Ok) + self.future.poll(cx).map(Ok) } } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index f10733af4f..c2bdec0795 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -53,28 +53,28 @@ macro_rules! delegate_sink { self: Pin<&mut Self>, cx: &mut $crate::core_reexport::task::Context<'_>, ) -> $crate::core_reexport::task::Poll> { - self.$field().poll_ready(cx) + self.project().$field.poll_ready(cx) } fn start_send( self: Pin<&mut Self>, item: $item, ) -> Result<(), Self::SinkError> { - self.$field().start_send(item) + self.project().$field.start_send(item) } fn poll_flush( self: Pin<&mut Self>, cx: &mut $crate::core_reexport::task::Context<'_>, ) -> $crate::core_reexport::task::Poll> { - self.$field().poll_flush(cx) + self.project().$field.poll_flush(cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut $crate::core_reexport::task::Context<'_>, ) -> $crate::core_reexport::task::Poll> { - self.$field().poll_close(cx) + self.project().$field.poll_close(cx) } } } diff --git a/futures-util/src/sink/buffer.rs b/futures-util/src/sink/buffer.rs index ec88f2d209..bcd0c9c1de 100644 --- a/futures-util/src/sink/buffer.rs +++ b/futures-util/src/sink/buffer.rs @@ -1,14 +1,16 @@ +use alloc::collections::VecDeque; +use core::pin::Pin; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use core::pin::Pin; -use alloc::collections::VecDeque; +use pin_project::{pin_project, unsafe_project}; /// Sink for the [`buffer`](super::SinkExt::buffer) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct Buffer, Item> { + #[pin] sink: Si, buf: VecDeque, @@ -16,13 +18,7 @@ pub struct Buffer, Item> { capacity: usize, } -impl + Unpin, Item> Unpin for Buffer {} - impl, Item> Buffer { - unsafe_pinned!(sink: Si); - unsafe_unpinned!(buf: VecDeque); - unsafe_unpinned!(capacity: usize); - pub(super) fn new(sink: Si, capacity: usize) -> Self { Buffer { sink, @@ -42,8 +38,9 @@ impl, Item> Buffer { } /// Get a pinned mutable reference to the inner sink. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut Si> { - self.sink() + self.sink } /// Consumes this combinator, returning the underlying sink. @@ -54,77 +51,84 @@ impl, Item> Buffer { self.sink } - fn try_empty_buffer( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - ready!(self.as_mut().sink().poll_ready(cx))?; - while let Some(item) = self.as_mut().buf().pop_front() { - self.as_mut().sink().start_send(item)?; - if !self.buf.is_empty() { - ready!(self.as_mut().sink().poll_ready(cx))?; - } +} + +fn try_empty_buffer, Item>( + mut sink: Pin<&mut Si>, + buf: &mut VecDeque, + cx: &mut Context<'_>, +) -> Poll> { + ready!(sink.as_mut().poll_ready(cx))?; + while let Some(item) = buf.pop_front() { + sink.as_mut().start_send(item)?; + if !buf.is_empty() { + ready!(sink.as_mut().poll_ready(cx))?; } - Poll::Ready(Ok(())) } + Poll::Ready(Ok(())) } // Forwarding impl of Stream from the underlying sink impl Stream for Buffer where S: Sink + Stream { type Item = S::Item; + #[pin_project(self)] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sink().poll_next(cx) + self.sink.poll_next(cx) } } impl, Item> Sink for Buffer { type SinkError = Si::SinkError; + #[pin_project(self)] fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.capacity == 0 { - return self.as_mut().sink().poll_ready(cx); + if *self.capacity == 0 { + return self.sink.as_mut().poll_ready(cx); } - let _ = self.as_mut().try_empty_buffer(cx)?; + let _ = try_empty_buffer(self.sink.as_mut(), self.buf, cx)?; - if self.buf.len() >= self.capacity { + if self.buf.len() >= *self.capacity { Poll::Pending } else { Poll::Ready(Ok(())) } } + #[pin_project(self)] fn start_send( mut self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::SinkError> { - if self.capacity == 0 { - self.as_mut().sink().start_send(item) + if *self.capacity == 0 { + self.sink.start_send(item) } else { - self.as_mut().buf().push_back(item); + self.buf.push_back(item); Ok(()) } } + #[pin_project(self)] fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - ready!(self.as_mut().try_empty_buffer(cx))?; - debug_assert!(self.as_mut().buf().is_empty()); - self.as_mut().sink().poll_flush(cx) + ready!(try_empty_buffer(self.sink.as_mut(), self.buf, cx))?; + debug_assert!(self.buf.is_empty()); + self.sink.poll_flush(cx) } + #[pin_project(self)] fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - ready!(self.as_mut().try_empty_buffer(cx))?; - debug_assert!(self.as_mut().buf().is_empty()); - self.as_mut().sink().poll_close(cx) + ready!(try_empty_buffer(self.sink.as_mut(), self.buf, cx))?; + debug_assert!(self.buf.is_empty()); + self.sink.poll_close(cx) } } diff --git a/futures-util/src/sink/err_into.rs b/futures-util/src/sink/err_into.rs index ec86315fc3..60fa0f7910 100644 --- a/futures-util/src/sink/err_into.rs +++ b/futures-util/src/sink/err_into.rs @@ -3,12 +3,14 @@ use core::pin::Pin; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::{Sink}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct SinkErrInto, Item, E> { + #[pin] sink: SinkMapErr E>, } @@ -16,8 +18,6 @@ impl SinkErrInto where Si: Sink, Si::SinkError: Into, { - unsafe_pinned!(sink: SinkMapErr E>); - pub(super) fn new(sink: Si) -> Self { SinkErrInto { sink: SinkExt::sink_map_err(sink, Into::into), @@ -35,8 +35,9 @@ impl SinkErrInto } /// Get a pinned mutable reference to the inner sink. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut Si> { - self.sink().get_pin_mut() + self.sink.get_pin_mut() } /// Consumes this combinator, returning the underlying sink. @@ -63,10 +64,11 @@ impl Stream for SinkErrInto { type Item = S::Item; + #[pin_project(self)] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.sink().poll_next(cx) + self.sink.poll_next(cx) } } diff --git a/futures-util/src/sink/fanout.rs b/futures-util/src/sink/fanout.rs index 789723494b..761fb70d5a 100644 --- a/futures-util/src/sink/fanout.rs +++ b/futures-util/src/sink/fanout.rs @@ -2,22 +2,22 @@ use core::fmt::{Debug, Formatter, Result as FmtResult}; use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Sink that clones incoming items and forwards them to two sinks at the same time. /// /// Backpressure from any downstream sink propagates up, which means that this sink /// can only process items as fast as its _slowest_ downstream sink. +#[unsafe_project(Unpin)] #[must_use = "sinks do nothing unless polled"] pub struct Fanout { + #[pin] sink1: Si1, + #[pin] sink2: Si2 } impl Fanout { - unsafe_pinned!(sink1: Si1); - unsafe_pinned!(sink2: Si2); - pub(super) fn new(sink1: Si1, sink2: Si2) -> Fanout { Fanout { sink1, sink2 } } @@ -65,41 +65,45 @@ impl Sink for Fanout { type SinkError = Si1::SinkError; + #[pin_project(self)] fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let sink1_ready = self.as_mut().sink1().poll_ready(cx)?.is_ready(); - let sink2_ready = self.as_mut().sink2().poll_ready(cx)?.is_ready(); + let sink1_ready = self.sink1.as_mut().poll_ready(cx)?.is_ready(); + let sink2_ready = self.sink2.as_mut().poll_ready(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } + #[pin_project(self)] fn start_send( mut self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::SinkError> { - self.as_mut().sink1().start_send(item.clone())?; - self.as_mut().sink2().start_send(item)?; + self.sink1.as_mut().start_send(item.clone())?; + self.sink2.as_mut().start_send(item)?; Ok(()) } + #[pin_project(self)] fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let sink1_ready = self.as_mut().sink1().poll_flush(cx)?.is_ready(); - let sink2_ready = self.as_mut().sink2().poll_flush(cx)?.is_ready(); + let sink1_ready = self.sink1.as_mut().poll_flush(cx)?.is_ready(); + let sink2_ready = self.sink2.as_mut().poll_flush(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } + #[pin_project(self)] fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let sink1_ready = self.as_mut().sink1().poll_close(cx)?.is_ready(); - let sink2_ready = self.as_mut().sink2().poll_close(cx)?.is_ready(); + let sink1_ready = self.sink1.as_mut().poll_close(cx)?.is_ready(); + let sink2_ready = self.sink2.as_mut().poll_close(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } diff --git a/futures-util/src/stream/buffer_unordered.rs b/futures-util/src/stream/buffer_unordered.rs index a5431e6f8e..e74d27ddae 100644 --- a/futures-util/src/stream/buffer_unordered.rs +++ b/futures-util/src/stream/buffer_unordered.rs @@ -1,31 +1,27 @@ -use crate::stream::{Fuse, FuturesUnordered}; +use crate::stream::{Fuse, FuturesUnordered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use core::fmt; -use core::pin::Pin; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) /// method. +#[unsafe_project(Unpin)] #[must_use = "streams do nothing unless polled"] pub struct BufferUnordered where St: Stream, St::Item: Future, { + #[pin] stream: Fuse, in_progress_queue: FuturesUnordered, max: usize, } -impl Unpin for BufferUnordered -where - St: Stream + Unpin, - St::Item: Future, -{} - impl fmt::Debug for BufferUnordered where St: Stream + fmt::Debug, @@ -45,9 +41,6 @@ where St: Stream, St::Item: Future, { - unsafe_pinned!(stream: Fuse); - unsafe_unpinned!(in_progress_queue: FuturesUnordered); - pub(super) fn new(stream: St, n: usize) -> BufferUnordered where St: Stream, @@ -80,8 +73,9 @@ where /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream().get_pin_mut() + self.stream.get_pin_mut() } /// Consumes this combinator, returning the underlying stream. @@ -100,21 +94,22 @@ where { type Item = ::Output; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { // First up, try to spawn off as many futures as possible by filling up // our slab of futures. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut), + while self.in_progress_queue.len() < *self.max { + match self.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => self.in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match Pin::new(self.as_mut().in_progress_queue()).poll_next(cx) { + match self.in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } diff --git a/futures-util/src/stream/buffered.rs b/futures-util/src/stream/buffered.rs index 61a585ab97..18549fa251 100644 --- a/futures-util/src/stream/buffered.rs +++ b/futures-util/src/stream/buffered.rs @@ -1,30 +1,26 @@ -use crate::stream::{Fuse, FuturesOrdered}; +use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use core::fmt; -use core::pin::Pin; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`buffered`](super::StreamExt::buffered) method. +#[unsafe_project(Unpin)] #[must_use = "streams do nothing unless polled"] pub struct Buffered where St: Stream, St::Item: Future, { + #[pin] stream: Fuse, in_progress_queue: FuturesOrdered, max: usize, } -impl Unpin for Buffered -where - St: Stream + Unpin, - St::Item: Future, -{} - impl fmt::Debug for Buffered where St: Stream + fmt::Debug, @@ -44,9 +40,6 @@ where St: Stream, St::Item: Future, { - unsafe_pinned!(stream: Fuse); - unsafe_unpinned!(in_progress_queue: FuturesOrdered); - pub(super) fn new(stream: St, n: usize) -> Buffered { Buffered { stream: super::Fuse::new(stream), @@ -75,8 +68,9 @@ where /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream().get_pin_mut() + self.stream.get_pin_mut() } /// Consumes this combinator, returning the underlying stream. @@ -95,21 +89,22 @@ where { type Item = ::Output; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { // Try to spawn off as many futures as possible by filling up // our in_progress_queue of futures. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut), + while self.in_progress_queue.len() < *self.max { + match self.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => self.in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - let res = Pin::new(self.as_mut().in_progress_queue()).poll_next(cx); + let res = self.in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { return Poll::Ready(Some(val)) } diff --git a/futures-util/src/stream/catch_unwind.rs b/futures-util/src/stream/catch_unwind.rs index 1f03c51760..a724841a94 100644 --- a/futures-util/src/stream/catch_unwind.rs +++ b/futures-util/src/stream/catch_unwind.rs @@ -1,23 +1,21 @@ use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; use std::any::Any; -use std::pin::Pin; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; -use std::prelude::v1::*; +use std::pin::Pin; /// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct CatchUnwind { + #[pin] stream: St, caught_unwind: bool, } impl CatchUnwind { - unsafe_pinned!(stream: St); - unsafe_unpinned!(caught_unwind: bool); - pub(super) fn new(stream: St) -> CatchUnwind { CatchUnwind { stream, caught_unwind: false } } @@ -27,21 +25,22 @@ impl Stream for CatchUnwind { type Item = Result>; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.caught_unwind { + if *self.caught_unwind { Poll::Ready(None) } else { let res = catch_unwind(AssertUnwindSafe(|| { - self.as_mut().stream().poll_next(cx) + self.stream.as_mut().poll_next(cx) })); match res { Ok(poll) => poll.map(|opt| opt.map(Ok)), Err(e) => { - *self.as_mut().caught_unwind() = true; + *self.caught_unwind = true; Poll::Ready(Some(Err(e))) }, } diff --git a/futures-util/src/stream/chain.rs b/futures-util/src/stream/chain.rs index 084cd5a62b..620d4ccd53 100644 --- a/futures-util/src/stream/chain.rs +++ b/futures-util/src/stream/chain.rs @@ -1,13 +1,16 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`chain`](super::StreamExt::chain) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Chain { + #[pin] first: Option, + #[pin] second: St2, } @@ -16,9 +19,6 @@ impl Chain where St1: Stream, St2: Stream, { - unsafe_pinned!(first: Option); - unsafe_pinned!(second: St2); - pub(super) fn new(stream1: St1, stream2: St2) -> Chain { Chain { first: Some(stream1), @@ -39,16 +39,17 @@ where St1: Stream, { type Item = St1::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if let Some(first) = self.as_mut().first().as_pin_mut() { + if let Some(first) = self.first.as_mut().as_pin_mut() { if let Some(item) = ready!(first.poll_next(cx)) { return Poll::Ready(Some(item)) } } - self.as_mut().first().set(None); - self.as_mut().second().poll_next(cx) + self.first.set(None); + self.second.poll_next(cx) } } diff --git a/futures-util/src/stream/chunks.rs b/futures-util/src/stream/chunks.rs index c4f0b466fb..d6baf2a8be 100644 --- a/futures-util/src/stream/chunks.rs +++ b/futures-util/src/stream/chunks.rs @@ -2,26 +2,23 @@ use crate::stream::Fuse; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; use core::mem; use core::pin::Pin; use alloc::vec::Vec; /// Stream for the [`chunks`](super::StreamExt::chunks) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Chunks { + #[pin] stream: Fuse, items: Vec, cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475 } -impl Unpin for Chunks {} - impl Chunks where St: Stream { - unsafe_unpinned!(items: Vec); - unsafe_pinned!(stream: Fuse); - pub(super) fn new(stream: St, capacity: usize) -> Chunks { assert!(capacity > 0); @@ -32,11 +29,6 @@ impl Chunks where St: Stream { } } - fn take(mut self: Pin<&mut Self>) -> Vec { - let cap = self.cap; - mem::replace(self.as_mut().items(), Vec::with_capacity(cap)) - } - /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &St { @@ -57,8 +49,9 @@ impl Chunks where St: Stream { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream().get_pin_mut() + self.stream.get_pin_mut() } /// Consumes this combinator, returning the underlying stream. @@ -73,19 +66,21 @@ impl Chunks where St: Stream { impl Stream for Chunks { type Item = Vec; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { loop { - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(self.stream.as_mut().poll_next(cx)) { // Push the item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Some(item) => { - self.as_mut().items().push(item); - if self.items.len() >= self.cap { - return Poll::Ready(Some(self.as_mut().take())) + self.items.push(item); + if self.items.len() >= *self.cap { + let cap = *self.cap; + return Poll::Ready(Some(mem::replace(self.items, Vec::with_capacity(cap)))); } } @@ -95,7 +90,7 @@ impl Stream for Chunks { let last = if self.items.is_empty() { None } else { - let full_buf = mem::replace(self.as_mut().items(), Vec::new()); + let full_buf = mem::replace(self.items, Vec::new()); Some(full_buf) }; diff --git a/futures-util/src/stream/collect.rs b/futures-util/src/stream/collect.rs index 0952cb261b..52fd177c72 100644 --- a/futures-util/src/stream/collect.rs +++ b/futures-util/src/stream/collect.rs @@ -3,26 +3,19 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`collect`](super::StreamExt::collect) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Collect { + #[pin] stream: St, collection: C, } -impl Unpin for Collect {} - impl Collect { - unsafe_pinned!(stream: St); - unsafe_unpinned!(collection: C); - - fn finish(mut self: Pin<&mut Self>) -> C { - mem::replace(self.as_mut().collection(), Default::default()) - } - pub(super) fn new(stream: St) -> Collect { Collect { stream, @@ -43,11 +36,12 @@ where St: Stream, { type Output = C; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => self.as_mut().collection().extend(Some(e)), - None => return Poll::Ready(self.as_mut().finish()), + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(e) => self.collection.extend(Some(e)), + None => return Poll::Ready(mem::replace(self.collection, Default::default())), } } } diff --git a/futures-util/src/stream/concat.rs b/futures-util/src/stream/concat.rs index fc6dc0e1c9..af0a4527af 100644 --- a/futures-util/src/stream/concat.rs +++ b/futures-util/src/stream/concat.rs @@ -1,28 +1,24 @@ use core::fmt::{Debug, Formatter, Result as FmtResult}; use core::pin::Pin; -use core::default::Default; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`concat`](super::StreamExt::concat) method. +#[unsafe_project(Unpin)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Concat { + #[pin] stream: St, accum: Option, } -impl Unpin for Concat {} - impl Concat where St: Stream, St::Item: Extend<::Item> + IntoIterator + Default, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(accum: Option); - pub(super) fn new(stream: St) -> Concat { Concat { stream, @@ -49,20 +45,18 @@ where St: Stream, { type Output = St::Item; - fn poll( - mut self: Pin<&mut Self>, cx: &mut Context<'_> - ) -> Poll { + #[pin_project(self)] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(self.stream.as_mut().poll_next(cx)) { None => { - return Poll::Ready(self.as_mut().accum().take().unwrap_or_default()) + return Poll::Ready(self.accum.take().unwrap_or_default()) } Some(e) => { - let accum = self.as_mut().accum(); - if let Some(a) = accum { + if let Some(a) = self.accum { a.extend(e) } else { - *accum = Some(e) + *self.accum = Some(e) } } } diff --git a/futures-util/src/stream/enumerate.rs b/futures-util/src/stream/enumerate.rs index b5a4413bc1..8bbe3667b4 100644 --- a/futures-util/src/stream/enumerate.rs +++ b/futures-util/src/stream/enumerate.rs @@ -2,22 +2,19 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`enumerate`](super::StreamExt::enumerate) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Enumerate { + #[pin] stream: St, count: usize, } -impl Unpin for Enumerate {} - impl Enumerate { - unsafe_pinned!(stream: St); - unsafe_unpinned!(count: usize); - pub(super) fn new(stream: St) -> Enumerate { Enumerate { stream, @@ -45,8 +42,9 @@ impl Enumerate { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -67,14 +65,15 @@ impl FusedStream for Enumerate { impl Stream for Enumerate { type Item = (usize, St::Item); + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(self.stream.as_mut().poll_next(cx)) { Some(item) => { - let count = self.count; - *self.as_mut().count() += 1; + let count = *self.count; + *self.count += 1; Poll::Ready(Some((count, item))) } None => Poll::Ready(None), diff --git a/futures-util/src/stream/filter.rs b/futures-util/src/stream/filter.rs index 39c8045e16..fe3fd8c40f 100644 --- a/futures-util/src/stream/filter.rs +++ b/futures-util/src/stream/filter.rs @@ -3,9 +3,10 @@ use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`filter`](super::StreamExt::filter) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Filter @@ -13,8 +14,10 @@ pub struct Filter F: FnMut(&St::Item) -> Fut, Fut: Future, { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option, pending_item: Option, } @@ -24,11 +27,6 @@ where St: Stream, F: FnMut(&St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option); - unsafe_unpinned!(pending_item: Option); - pub(super) fn new(stream: St, f: F) -> Filter { Filter { stream, @@ -58,8 +56,9 @@ where St: Stream, /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -71,12 +70,6 @@ where St: Stream, } } -impl Unpin for Filter - where St: Stream + Unpin, - F: FnMut(&St::Item) -> Fut, - Fut: Future + Unpin, -{} - impl FusedStream for Filter where St: Stream + FusedStream, F: FnMut(&St::Item) -> Fut, @@ -94,24 +87,25 @@ impl Stream for Filter { type Item = St::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { loop { if self.pending_fut.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { + let item = match ready!(self.stream.as_mut().poll_next(cx)) { Some(e) => e, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); + let fut = (self.f)(&item); + self.pending_fut.set(Some(fut)); + *self.pending_item = Some(item); } - let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); + let yield_item = ready!(self.pending_fut.as_mut().as_pin_mut().unwrap().poll(cx)); + self.pending_fut.set(None); + let item = self.pending_item.take().unwrap(); if yield_item { return Poll::Ready(Some(item)); diff --git a/futures-util/src/stream/filter_map.rs b/futures-util/src/stream/filter_map.rs index e05e3b3967..6cda52ac96 100644 --- a/futures-util/src/stream/filter_map.rs +++ b/futures-util/src/stream/filter_map.rs @@ -3,9 +3,10 @@ use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`filter_map`](super::StreamExt::filter_map) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct FilterMap @@ -13,26 +14,18 @@ pub struct FilterMap F: FnMut(St::Item) -> Fut, Fut: Future, { + #[pin] stream: St, f: F, + #[pin] pending: Option, } -impl Unpin for FilterMap - where St: Stream + Unpin, - F: FnMut(St::Item) -> Fut, - Fut: Future + Unpin, -{} - impl FilterMap where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending: Option); - pub(super) fn new(stream: St, f: F) -> FilterMap { FilterMap { stream, f, pending: None } } @@ -57,8 +50,9 @@ impl FilterMap /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -87,22 +81,23 @@ impl Stream for FilterMap { type Item = T; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { loop { if self.pending.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { + let item = match ready!(self.stream.as_mut().poll_next(cx)) { Some(e) => e, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(item); - self.as_mut().pending().set(Some(fut)); + let fut = (self.f)(item); + self.pending.set(Some(fut)); } - let item = ready!(self.as_mut().pending().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending().set(None); + let item = ready!(self.pending.as_mut().as_pin_mut().unwrap().poll(cx)); + self.pending.set(None); if item.is_some() { return Poll::Ready(item); } diff --git a/futures-util/src/stream/flatten.rs b/futures-util/src/stream/flatten.rs index 428a81fe4e..f385089f5a 100644 --- a/futures-util/src/stream/flatten.rs +++ b/futures-util/src/stream/flatten.rs @@ -2,30 +2,25 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`flatten`](super::StreamExt::flatten) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Flatten where St: Stream, { + #[pin] stream: St, + #[pin] next: Option, } -impl Unpin for Flatten -where St: Stream + Unpin, - St::Item: Stream + Unpin, -{} - impl Flatten where St: Stream, St::Item: Stream, { - unsafe_pinned!(stream: St); - unsafe_pinned!(next: Option); - pub(super) fn new(stream: St) -> Flatten{ Flatten { stream, next: None, } } @@ -50,8 +45,9 @@ where St: Stream, /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -75,22 +71,23 @@ impl Stream for Flatten { type Item = ::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { loop { if self.next.is_none() { - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(e) => self.as_mut().next().set(Some(e)), + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(e) => self.next.set(Some(e)), None => return Poll::Ready(None), } } - let item = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)); + let item = ready!(self.next.as_mut().as_pin_mut().unwrap().poll_next(cx)); if item.is_some() { return Poll::Ready(item); } else { - self.as_mut().next().set(None); + self.next.set(None); } } } diff --git a/futures-util/src/stream/fold.rs b/futures-util/src/stream/fold.rs index 887b9022b8..a654088ae4 100644 --- a/futures-util/src/stream/fold.rs +++ b/futures-util/src/stream/fold.rs @@ -2,30 +2,26 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`fold`](super::StreamExt::fold) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fold { + #[pin] stream: St, f: F, accum: Option, + #[pin] future: Option, } -impl Unpin for Fold {} - impl Fold where St: Stream, F: FnMut(T, St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_unpinned!(accum: Option); - unsafe_pinned!(future: Option); - pub(super) fn new(stream: St, f: F, t: T) -> Fold { Fold { stream, @@ -49,22 +45,22 @@ impl Future for Fold { type Output = T; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { // we're currently processing a future to produce a new accum value if self.accum.is_none() { - let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - *self.as_mut().accum() = Some(accum); - self.as_mut().future().set(None); + let accum = ready!(self.future.as_mut().as_pin_mut().unwrap().poll(cx)); + *self.accum = Some(accum); + self.future.set(None); } - let item = ready!(self.as_mut().stream().poll_next(cx)); - let accum = self.as_mut().accum().take() - .expect("Fold polled after completion"); + let item = ready!(self.stream.as_mut().poll_next(cx)); + let accum = self.accum.take().expect("Fold polled after completion"); if let Some(e) = item { - let future = (self.as_mut().f())(accum, e); - self.as_mut().future().set(Some(future)); + let future = (self.f)(accum, e); + self.future.set(Some(future)); } else { return Poll::Ready(accum) } diff --git a/futures-util/src/stream/for_each.rs b/futures-util/src/stream/for_each.rs index 85e07ec257..01c420ce3b 100644 --- a/futures-util/src/stream/for_each.rs +++ b/futures-util/src/stream/for_each.rs @@ -2,32 +2,25 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`for_each`](super::StreamExt::for_each) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ForEach { + #[pin] stream: St, f: F, + #[pin] future: Option, } -impl Unpin for ForEach -where St: Stream + Unpin, - F: FnMut(St::Item) -> Fut, - Fut: Future + Unpin, -{} - impl ForEach where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(future: Option); - pub(super) fn new(stream: St, f: F) -> ForEach { ForEach { stream, @@ -50,17 +43,18 @@ impl Future for ForEach { type Output = (); + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { loop { - if let Some(future) = self.as_mut().future().as_pin_mut() { + if let Some(future) = self.future.as_mut().as_pin_mut() { ready!(future.poll(cx)); } - self.as_mut().future().set(None); + self.future.set(None); - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(self.stream.as_mut().poll_next(cx)) { Some(e) => { - let future = (self.as_mut().f())(e); - self.as_mut().future().set(Some(future)); + let future = (self.f)(e); + self.future.set(Some(future)); } None => { return Poll::Ready(()); diff --git a/futures-util/src/stream/for_each_concurrent.rs b/futures-util/src/stream/for_each_concurrent.rs index 33787f26b5..0f62cb0925 100644 --- a/futures-util/src/stream/for_each_concurrent.rs +++ b/futures-util/src/stream/for_each_concurrent.rs @@ -4,34 +4,26 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) /// method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ForEachConcurrent { + #[pin] stream: Option, f: F, futures: FuturesUnordered, limit: Option, } -impl Unpin for ForEachConcurrent -where St: Unpin, - Fut: Unpin, -{} - impl ForEachConcurrent where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: Option); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered); - unsafe_unpinned!(limit: Option); - pub(super) fn new(stream: St, limit: Option, f: F) -> ForEachConcurrent { ForEachConcurrent { stream: Some(stream), @@ -56,6 +48,7 @@ impl Future for ForEachConcurrent { type Output = (); + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { loop { let mut made_progress_this_iter = false; @@ -65,7 +58,7 @@ impl Future for ForEachConcurrent // Check if we've already created a number of futures greater than `limit` if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { let mut stream_completed = false; - let elem = if let Some(stream) = self.as_mut().stream().as_pin_mut() { + let elem = if let Some(stream) = self.stream.as_mut().as_pin_mut() { match stream.poll_next(cx) { Poll::Ready(Some(elem)) => { made_progress_this_iter = true; @@ -81,15 +74,15 @@ impl Future for ForEachConcurrent None }; if stream_completed { - self.as_mut().stream().set(None); + self.stream.set(None); } if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + let next_future = (self.f)(elem); + self.futures.push(next_future); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match self.futures.poll_next_unpin(cx) { Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { if self.stream.is_none() { diff --git a/futures-util/src/stream/fuse.rs b/futures-util/src/stream/fuse.rs index 45a1d8ccc5..3cda53a757 100644 --- a/futures-util/src/stream/fuse.rs +++ b/futures-util/src/stream/fuse.rs @@ -2,22 +2,19 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`fuse`](super::StreamExt::fuse) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Fuse { + #[pin] stream: St, done: bool, } -impl Unpin for Fuse {} - impl Fuse { - unsafe_pinned!(stream: St); - unsafe_unpinned!(done: bool); - pub(super) fn new(stream: St) -> Fuse { Fuse { stream, done: false } } @@ -51,8 +48,9 @@ impl Fuse { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -73,17 +71,18 @@ impl FusedStream for Fuse { impl Stream for Fuse { type Item = S::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.done { + if *self.done { return Poll::Ready(None); } - let item = ready!(self.as_mut().stream().poll_next(cx)); + let item = ready!(self.stream.as_mut().poll_next(cx)); if item.is_none() { - *self.as_mut().done() = true; + *self.done = true; } Poll::Ready(item) } diff --git a/futures-util/src/stream/inspect.rs b/futures-util/src/stream/inspect.rs index c87cbded73..7d5a7f4e73 100644 --- a/futures-util/src/stream/inspect.rs +++ b/futures-util/src/stream/inspect.rs @@ -2,25 +2,22 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`inspect`](super::StreamExt::inspect) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Inspect where St: Stream { + #[pin] stream: St, f: F, } -impl Unpin for Inspect {} - impl Inspect where St: Stream, F: FnMut(&St::Item), { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Inspect { Inspect { stream, f } } @@ -45,8 +42,9 @@ impl Inspect /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -77,14 +75,15 @@ impl Stream for Inspect { type Item = St::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.as_mut() - .stream() + self.stream + .as_mut() .poll_next(cx) - .map(|opt| opt.map(|e| inspect(e, self.as_mut().f()))) + .map(|opt| opt.map(|e| inspect(e, self.f))) } } diff --git a/futures-util/src/stream/map.rs b/futures-util/src/stream/map.rs index 2f7df67a47..cf63e2688c 100644 --- a/futures-util/src/stream/map.rs +++ b/futures-util/src/stream/map.rs @@ -2,25 +2,22 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`map`](super::StreamExt::map) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Map { + #[pin] stream: St, f: F, } -impl Unpin for Map {} - impl Map where St: Stream, F: FnMut(St::Item) -> T, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Map { Map { stream, f } } @@ -45,8 +42,9 @@ impl Map /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -70,14 +68,15 @@ impl Stream for Map { type Item = T; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.as_mut() - .stream() + self.stream + .as_mut() .poll_next(cx) - .map(|opt| opt.map(|x| self.as_mut().f()(x))) + .map(|opt| opt.map(|x| (self.f)(x))) } } diff --git a/futures-util/src/stream/once.rs b/futures-util/src/stream/once.rs index 6cb7a30ee7..c407b678f5 100644 --- a/futures-util/src/stream/once.rs +++ b/futures-util/src/stream/once.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Creates a stream of single element /// @@ -24,31 +24,28 @@ pub fn once(future: Fut) -> Once { /// A stream which emits single element and then EOF. /// /// This stream will never block and is always ready. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Once { + #[pin] future: Option } -impl Unpin for Once {} - -impl Once { - unsafe_pinned!(future: Option); -} - impl Stream for Once { type Item = Fut::Output; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let val = if let Some(f) = self.as_mut().future().as_pin_mut() { + let val = if let Some(f) = self.future.as_mut().as_pin_mut() { ready!(f.poll(cx)) } else { return Poll::Ready(None) }; - self.future().set(None); + self.future.set(None); Poll::Ready(Some(val)) } } diff --git a/futures-util/src/stream/peek.rs b/futures-util/src/stream/peek.rs index 7bc4765985..3b9e2d410d 100644 --- a/futures-util/src/stream/peek.rs +++ b/futures-util/src/stream/peek.rs @@ -3,26 +3,23 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// A `Stream` that implements a `peek` method. /// /// The `peek` method can be used to retrieve a reference /// to the next `Stream::Item` if available. A subsequent /// call to `poll` will return the owned item. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Peekable { + #[pin] stream: Fuse, peeked: Option, } -impl Unpin for Peekable {} - impl Peekable { - unsafe_pinned!(stream: Fuse); - unsafe_unpinned!(peeked: Option); - pub(super) fn new(stream: St) -> Peekable { Peekable { stream: stream.fuse(), @@ -50,8 +47,9 @@ impl Peekable { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream().get_pin_mut() + self.stream.get_pin_mut() } /// Consumes this combinator, returning the underlying stream. @@ -66,20 +64,19 @@ impl Peekable { /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. + #[pin_project(self)] pub fn peek<'a>( mut self: Pin<&'a mut Self>, cx: &mut Context<'_>, ) -> Poll> { if self.peeked.is_some() { - let this: &Self = self.into_ref().get_ref(); - return Poll::Ready(this.peeked.as_ref()) + return Poll::Ready(self.peeked.as_ref()) } - match ready!(self.as_mut().stream().poll_next(cx)) { + match ready!(self.stream.as_mut().poll_next(cx)) { None => Poll::Ready(None), Some(item) => { - *self.as_mut().peeked() = Some(item); - let this: &Self = self.into_ref().get_ref(); - Poll::Ready(this.peeked.as_ref()) + *self.peeked = Some(item); + Poll::Ready(self.peeked.as_ref()) } } } @@ -94,14 +91,15 @@ impl FusedStream for Peekable { impl Stream for Peekable { type Item = S::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if let Some(item) = self.as_mut().peeked().take() { + if let Some(item) = self.peeked.take() { return Poll::Ready(Some(item)) } - self.as_mut().stream().poll_next(cx) + self.stream.poll_next(cx) } } diff --git a/futures-util/src/stream/skip.rs b/futures-util/src/stream/skip.rs index 2bd72f1843..6db50160d4 100644 --- a/futures-util/src/stream/skip.rs +++ b/futures-util/src/stream/skip.rs @@ -2,22 +2,19 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`skip`](super::StreamExt::skip) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Skip { + #[pin] stream: St, remaining: u64, } -impl Unpin for Skip {} - impl Skip { - unsafe_pinned!(stream: St); - unsafe_unpinned!(remaining: u64); - pub(super) fn new(stream: St, n: u64) -> Skip { Skip { stream, @@ -45,8 +42,9 @@ impl Skip { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -67,18 +65,19 @@ impl FusedStream for Skip { impl Stream for Skip { type Item = St::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - while self.remaining > 0 { - match ready!(self.as_mut().stream().poll_next(cx)) { - Some(_) => *self.as_mut().remaining() -= 1, + while *self.remaining > 0 { + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(_) => *self.remaining -= 1, None => return Poll::Ready(None), } } - self.as_mut().stream().poll_next(cx) + self.stream.poll_next(cx) } } diff --git a/futures-util/src/stream/skip_while.rs b/futures-util/src/stream/skip_while.rs index 16f2cbc0e3..36b12481e6 100644 --- a/futures-util/src/stream/skip_while.rs +++ b/futures-util/src/stream/skip_while.rs @@ -3,32 +3,27 @@ use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct SkipWhile where St: Stream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option, pending_item: Option, done_skipping: bool, } -impl Unpin for SkipWhile {} - impl SkipWhile where St: Stream, F: FnMut(&St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option); - unsafe_unpinned!(pending_item: Option); - unsafe_unpinned!(done_skipping: bool); - pub(super) fn new(stream: St, f: F) -> SkipWhile { SkipWhile { stream, @@ -59,8 +54,9 @@ impl SkipWhile /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -85,31 +81,32 @@ impl Stream for SkipWhile { type Item = St::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.done_skipping { - return self.as_mut().stream().poll_next(cx); + if *self.done_skipping { + return self.stream.poll_next(cx); } loop { if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { + let item = match ready!(self.stream.as_mut().poll_next(cx)) { Some(e) => e, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); + let fut = (self.f)(&item); + self.pending_fut.set(Some(fut)); + *self.pending_item = Some(item); } - let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - let item = self.as_mut().pending_item().take().unwrap(); - self.as_mut().pending_fut().set(None); + let skipped = ready!(self.pending_fut.as_mut().as_pin_mut().unwrap().poll(cx)); + let item = self.pending_item.take().unwrap(); + self.pending_fut.set(None); if !skipped { - *self.as_mut().done_skipping() = true; + *self.done_skipping = true; return Poll::Ready(Some(item)) } } diff --git a/futures-util/src/stream/take.rs b/futures-util/src/stream/take.rs index 9d641900a4..09e636f56b 100644 --- a/futures-util/src/stream/take.rs +++ b/futures-util/src/stream/take.rs @@ -2,22 +2,19 @@ use core::pin::Pin; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`take`](super::StreamExt::take) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Take { + #[pin] stream: St, remaining: u64, } -impl Unpin for Take {} - impl Take { - unsafe_pinned!(stream: St); - unsafe_unpinned!(remaining: u64); - pub(super) fn new(stream: St, n: u64) -> Take { Take { stream, @@ -45,8 +42,9 @@ impl Take { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -63,17 +61,18 @@ impl Stream for Take { type Item = St::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.remaining == 0 { + if *self.remaining == 0 { Poll::Ready(None) } else { - let next = ready!(self.as_mut().stream().poll_next(cx)); + let next = ready!(self.stream.as_mut().poll_next(cx)); match next { - Some(_) => *self.as_mut().remaining() -= 1, - None => *self.as_mut().remaining() = 0, + Some(_) => *self.remaining -= 1, + None => *self.remaining = 0, } Poll::Ready(next) } diff --git a/futures-util/src/stream/take_while.rs b/futures-util/src/stream/take_while.rs index ba5557e10e..501ce86ee7 100644 --- a/futures-util/src/stream/take_while.rs +++ b/futures-util/src/stream/take_while.rs @@ -3,32 +3,27 @@ use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`take_while`](super::StreamExt::take_while) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TakeWhile { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option, pending_item: Option, done_taking: bool, } -impl Unpin for TakeWhile {} - impl TakeWhile where St: Stream, F: FnMut(&St::Item) -> Fut, Fut: Future, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option); - unsafe_unpinned!(pending_item: Option); - unsafe_unpinned!(done_taking: bool); - pub(super) fn new(stream: St, f: F) -> TakeWhile { TakeWhile { stream, @@ -59,8 +54,9 @@ impl TakeWhile /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -79,32 +75,33 @@ impl Stream for TakeWhile { type Item = St::Item; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.done_taking { + if *self.done_taking { return Poll::Ready(None); } if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { + let item = match ready!(self.stream.as_mut().poll_next(cx)) { Some(e) => e, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); + let fut = (self.f)(&item); + self.pending_fut.set(Some(fut)); + *self.pending_item = Some(item); } - let take = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); + let take = ready!(self.pending_fut.as_mut().as_pin_mut().unwrap().poll(cx)); + self.pending_fut.set(None); + let item = self.pending_item.take().unwrap(); if take { Poll::Ready(Some(item)) } else { - *self.as_mut().done_taking() = true; + *self.done_taking = true; Poll::Ready(None) } } diff --git a/futures-util/src/stream/then.rs b/futures-util/src/stream/then.rs index a3c46d4909..8caffdf71a 100644 --- a/futures-util/src/stream/then.rs +++ b/futures-util/src/stream/then.rs @@ -3,27 +3,24 @@ use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`then`](super::StreamExt::then) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Then { + #[pin] stream: St, + #[pin] future: Option, f: F, } -impl Unpin for Then {} - impl Then where St: Stream, F: FnMut(St::Item) -> Fut, { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Then { Then { stream, @@ -52,8 +49,9 @@ impl Then /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -78,21 +76,22 @@ impl Stream for Then { type Item = Fut::Output; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if self.future.is_none() { - let item = match ready!(self.as_mut().stream().poll_next(cx)) { + let item = match ready!(self.stream.as_mut().poll_next(cx)) { None => return Poll::Ready(None), Some(e) => e, }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); + let fut = (self.f)(item); + self.future.set(Some(fut)); } - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - self.as_mut().future().set(None); + let e = ready!(self.future.as_mut().as_pin_mut().unwrap().poll(cx)); + self.future.set(None); Poll::Ready(Some(e)) } } diff --git a/futures-util/src/stream/unfold.rs b/futures-util/src/stream/unfold.rs index f721301179..2a576b8251 100644 --- a/futures-util/src/stream/unfold.rs +++ b/futures-util/src/stream/unfold.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// @@ -61,22 +61,16 @@ pub fn unfold(init: T, f: F) -> Unfold } /// Stream for the [`unfold`] function. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Unfold { f: F, state: Option, + #[pin] fut: Option, } -impl Unpin for Unfold {} - -impl Unfold { - unsafe_unpinned!(f: F); - unsafe_unpinned!(state: Option); - unsafe_pinned!(fut: Option); -} - impl FusedStream for Unfold { fn is_terminated(&self) -> bool { self.state.is_none() && self.fut.is_none() @@ -89,20 +83,21 @@ impl Stream for Unfold { type Item = It; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if let Some(state) = self.as_mut().state().take() { - let fut = (self.as_mut().f())(state); - self.as_mut().fut().set(Some(fut)); + if let Some(state) = self.state.take() { + let fut = (self.f)(state); + self.fut.set(Some(fut)); } - let step = ready!(self.as_mut().fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().fut().set(None); + let step = ready!(self.fut.as_mut().as_pin_mut().unwrap().poll(cx)); + self.fut.set(None); if let Some((item, next_state)) = step { - *self.as_mut().state() = Some(next_state); + *self.state = Some(next_state); return Poll::Ready(Some(item)) } else { return Poll::Ready(None) diff --git a/futures-util/src/stream/zip.rs b/futures-util/src/stream/zip.rs index 69ef006feb..046d2d5d91 100644 --- a/futures-util/src/stream/zip.rs +++ b/futures-util/src/stream/zip.rs @@ -2,32 +2,22 @@ use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`zip`](super::StreamExt::zip) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Zip { + #[pin] stream1: Fuse, + #[pin] stream2: Fuse, queued1: Option, queued2: Option, } -impl Unpin for Zip -where - St1: Stream, - Fuse: Unpin, - St2: Stream, - Fuse: Unpin, -{} - impl Zip { - unsafe_pinned!(stream1: Fuse); - unsafe_pinned!(stream2: Fuse); - unsafe_unpinned!(queued1: Option); - unsafe_unpinned!(queued2: Option); - pub(super) fn new(stream1: St1, stream2: St2) -> Zip { Zip { stream1: stream1.fuse(), @@ -86,26 +76,27 @@ impl Stream for Zip { type Item = (St1::Item, St2::Item); + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if self.queued1.is_none() { - match self.as_mut().stream1().poll_next(cx) { - Poll::Ready(Some(item1)) => *self.as_mut().queued1() = Some(item1), + match self.stream1.as_mut().poll_next(cx) { + Poll::Ready(Some(item1)) => *self.queued1 = Some(item1), Poll::Ready(None) | Poll::Pending => {} } } if self.queued2.is_none() { - match self.as_mut().stream2().poll_next(cx) { - Poll::Ready(Some(item2)) => *self.as_mut().queued2() = Some(item2), + match self.stream2.as_mut().poll_next(cx) { + Poll::Ready(Some(item2)) => *self.queued2 = Some(item2), Poll::Ready(None) | Poll::Pending => {} } } if self.queued1.is_some() && self.queued2.is_some() { - let pair = (self.as_mut().queued1().take().unwrap(), - self.as_mut().queued2().take().unwrap()); + let pair = (self.queued1.take().unwrap(), + self.queued2.take().unwrap()); Poll::Ready(Some(pair)) } else if self.stream1.is_done() || self.stream2.is_done() { Poll::Ready(None) diff --git a/futures-util/src/try_future/and_then.rs b/futures-util/src/try_future/and_then.rs index 37333e0503..eb702e4c29 100644 --- a/futures-util/src/try_future/and_then.rs +++ b/futures-util/src/try_future/and_then.rs @@ -2,12 +2,14 @@ use super::{TryChain, TryChainAction}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`and_then`](super::TryFutureExt::and_then) method. +#[unsafe_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct AndThen { + #[pin] try_chain: TryChain, } @@ -15,9 +17,6 @@ impl AndThen where Fut1: TryFuture, Fut2: TryFuture, { - unsafe_pinned!(try_chain: TryChain); - - /// Creates a new `Then`. pub(super) fn new(future: Fut1, f: F) -> AndThen { AndThen { try_chain: TryChain::new(future, f), @@ -42,8 +41,9 @@ impl Future for AndThen { type Output = Result; + #[pin_project(self)] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.try_chain().poll(cx, |result, async_op| { + self.try_chain.poll(cx, |result, async_op| { match result { Ok(ok) => TryChainAction::Future(async_op(ok)), Err(err) => TryChainAction::Output(Err(err)), diff --git a/futures-util/src/try_future/err_into.rs b/futures-util/src/try_future/err_into.rs index 2f669fc54f..026fca86de 100644 --- a/futures-util/src/try_future/err_into.rs +++ b/futures-util/src/try_future/err_into.rs @@ -2,21 +2,19 @@ use core::marker::PhantomData; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`err_into`](super::TryFutureExt::err_into) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ErrInto { + #[pin] future: Fut, _marker: PhantomData, } -impl Unpin for ErrInto {} - impl ErrInto { - unsafe_pinned!(future: Fut); - pub(super) fn new(future: Fut) -> ErrInto { ErrInto { future, @@ -35,11 +33,12 @@ impl Future for ErrInto { type Output = Result; + #[pin_project(self)] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - self.future().try_poll(cx) + self.future.try_poll(cx) .map(|res| res.map_err(Into::into)) } } diff --git a/futures-util/src/try_future/into_future.rs b/futures-util/src/try_future/into_future.rs index 2e998206ed..7dd7aa86ad 100644 --- a/futures-util/src/try_future/into_future.rs +++ b/futures-util/src/try_future/into_future.rs @@ -1,18 +1,18 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`into_future`](super::TryFutureExt::into_future) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct IntoFuture { + #[pin] future: Fut, } impl IntoFuture { - unsafe_pinned!(future: Fut); - #[inline] pub(super) fn new(future: Fut) -> IntoFuture { IntoFuture { future } @@ -26,11 +26,12 @@ impl FusedFuture for IntoFuture { impl Future for IntoFuture { type Output = Result; + #[pin_project(self)] #[inline] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - self.future().try_poll(cx) + self.future.try_poll(cx) } } diff --git a/futures-util/src/try_future/map_err.rs b/futures-util/src/try_future/map_err.rs index 1095fbd352..f0d4dadcae 100644 --- a/futures-util/src/try_future/map_err.rs +++ b/futures-util/src/try_future/map_err.rs @@ -1,28 +1,24 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`map_err`](super::TryFutureExt::map_err) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct MapErr { + #[pin] future: Fut, f: Option, } impl MapErr { - unsafe_pinned!(future: Fut); - unsafe_unpinned!(f: Option); - - /// Creates a new MapErr. pub(super) fn new(future: Fut, f: F) -> MapErr { MapErr { future, f: Some(f) } } } -impl Unpin for MapErr {} - impl FusedFuture for MapErr { fn is_terminated(&self) -> bool { self.f.is_none() } } @@ -33,15 +29,16 @@ impl Future for MapErr { type Output = Result; + #[pin_project(self)] fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - self.as_mut() - .future() + self.future + .as_mut() .try_poll(cx) .map(|result| { - let f = self.as_mut().f().take() + let f = self.f.take() .expect("MapErr must not be polled after it returned `Poll::Ready`"); result.map_err(f) }) diff --git a/futures-util/src/try_future/map_ok.rs b/futures-util/src/try_future/map_ok.rs index 3debb726c0..050c70082b 100644 --- a/futures-util/src/try_future/map_ok.rs +++ b/futures-util/src/try_future/map_ok.rs @@ -1,28 +1,24 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`map_ok`](super::TryFutureExt::map_ok) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct MapOk { + #[pin] future: Fut, f: Option, } impl MapOk { - unsafe_pinned!(future: Fut); - unsafe_unpinned!(f: Option); - - /// Creates a new MapOk. pub(super) fn new(future: Fut, f: F) -> MapOk { MapOk { future, f: Some(f) } } } -impl Unpin for MapOk {} - impl FusedFuture for MapOk { fn is_terminated(&self) -> bool { self.f.is_none() @@ -35,15 +31,16 @@ impl Future for MapOk { type Output = Result; + #[pin_project(self)] fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - self.as_mut() - .future() + self.future + .as_mut() .try_poll(cx) .map(|result| { - let op = self.as_mut().f().take() + let op = self.f.take() .expect("MapOk must not be polled after it returned `Poll::Ready`"); result.map(op) }) diff --git a/futures-util/src/try_future/or_else.rs b/futures-util/src/try_future/or_else.rs index a9c006fa9f..d07484b7cb 100644 --- a/futures-util/src/try_future/or_else.rs +++ b/futures-util/src/try_future/or_else.rs @@ -2,12 +2,14 @@ use super::{TryChain, TryChainAction}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`or_else`](super::TryFutureExt::or_else) method. +#[unsafe_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct OrElse { + #[pin] try_chain: TryChain, } @@ -15,9 +17,6 @@ impl OrElse where Fut1: TryFuture, Fut2: TryFuture, { - unsafe_pinned!(try_chain: TryChain); - - /// Creates a new `Then`. pub(super) fn new(future: Fut1, f: F) -> OrElse { OrElse { try_chain: TryChain::new(future, f), @@ -42,11 +41,12 @@ impl Future for OrElse { type Output = Result; + #[pin_project(self)] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - self.try_chain().poll(cx, |result, async_op| { + self.try_chain.poll(cx, |result, async_op| { match result { Ok(ok) => TryChainAction::Output(Ok(ok)), Err(err) => TryChainAction::Future(async_op(err)), diff --git a/futures-util/src/try_future/unwrap_or_else.rs b/futures-util/src/try_future/unwrap_or_else.rs index d0b80eb872..46e5de58f9 100644 --- a/futures-util/src/try_future/unwrap_or_else.rs +++ b/futures-util/src/try_future/unwrap_or_else.rs @@ -1,29 +1,25 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`unwrap_or_else`](super::TryFutureExt::unwrap_or_else) /// method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct UnwrapOrElse { + #[pin] future: Fut, f: Option, } impl UnwrapOrElse { - unsafe_pinned!(future: Fut); - unsafe_unpinned!(f: Option); - - /// Creates a new UnwrapOrElse. pub(super) fn new(future: Fut, f: F) -> UnwrapOrElse { UnwrapOrElse { future, f: Some(f) } } } -impl Unpin for UnwrapOrElse {} - impl FusedFuture for UnwrapOrElse { fn is_terminated(&self) -> bool { self.f.is_none() @@ -36,15 +32,16 @@ impl Future for UnwrapOrElse { type Output = Fut::Ok; + #[pin_project(self)] fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - self.as_mut() - .future() + self.future + .as_mut() .try_poll(cx) .map(|result| { - let op = self.as_mut().f().take() + let op = self.f.take() .expect("UnwrapOrElse already returned `Poll::Ready` before"); result.unwrap_or_else(op) }) diff --git a/futures-util/src/try_stream/and_then.rs b/futures-util/src/try_stream/and_then.rs index 39b1356928..0a9bf7c092 100644 --- a/futures-util/src/try_stream/and_then.rs +++ b/futures-util/src/try_stream/and_then.rs @@ -3,28 +3,25 @@ use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct AndThen { + #[pin] stream: St, + #[pin] future: Option, f: F, } -impl Unpin for AndThen {} - impl AndThen where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, future: None, f } } @@ -49,8 +46,9 @@ impl AndThen /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -69,21 +67,22 @@ impl Stream for AndThen { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { + let item = match ready!(self.stream.as_mut().try_poll_next(cx)?) { None => return Poll::Ready(None), Some(e) => e, }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); + let fut = (self.f)(item); + self.future.set(Some(fut)); } - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); + let e = ready!(self.future.as_mut().as_pin_mut().unwrap().try_poll(cx)); + self.future.set(None); Poll::Ready(Some(e)) } } diff --git a/futures-util/src/try_stream/err_into.rs b/futures-util/src/try_stream/err_into.rs index 05f6878b37..a0f1cba3fb 100644 --- a/futures-util/src/try_stream/err_into.rs +++ b/futures-util/src/try_stream/err_into.rs @@ -3,21 +3,19 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct ErrInto { + #[pin] stream: St, _marker: PhantomData, } -impl Unpin for ErrInto {} - impl ErrInto { - unsafe_pinned!(stream: St); - pub(super) fn new(stream: St) -> Self { ErrInto { stream, _marker: PhantomData } } @@ -42,8 +40,9 @@ impl ErrInto { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -68,11 +67,13 @@ where { type Item = Result; + #[pin_project(self)] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.stream().try_poll_next(cx) + self.stream + .try_poll_next(cx) .map(|res| res.map(|some| some.map_err(Into::into))) } } diff --git a/futures-util/src/try_stream/inspect_err.rs b/futures-util/src/try_stream/inspect_err.rs index ca90c4a345..75fd2c8959 100644 --- a/futures-util/src/try_stream/inspect_err.rs +++ b/futures-util/src/try_stream/inspect_err.rs @@ -3,26 +3,23 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct InspectErr { + #[pin] stream: St, f: F, } -impl Unpin for InspectErr {} - impl InspectErr where St: TryStream, F: FnMut(&St::Error), { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, f } } @@ -47,8 +44,9 @@ where /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -73,14 +71,15 @@ where { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.as_mut() - .stream() + self.stream + .as_mut() .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f())))) + .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.f)))) } } diff --git a/futures-util/src/try_stream/inspect_ok.rs b/futures-util/src/try_stream/inspect_ok.rs index 30060d2412..286821a292 100644 --- a/futures-util/src/try_stream/inspect_ok.rs +++ b/futures-util/src/try_stream/inspect_ok.rs @@ -3,26 +3,23 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct InspectOk { + #[pin] stream: St, f: F, } -impl Unpin for InspectOk {} - impl InspectOk where St: TryStream, F: FnMut(&St::Ok), { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, f } } @@ -47,8 +44,9 @@ where /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -73,14 +71,15 @@ where { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.as_mut() - .stream() + self.stream + .as_mut() .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f())))) + .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.f)))) } } diff --git a/futures-util/src/try_stream/into_stream.rs b/futures-util/src/try_stream/into_stream.rs index a6faadfa46..effab8a364 100644 --- a/futures-util/src/try_stream/into_stream.rs +++ b/futures-util/src/try_stream/into_stream.rs @@ -2,18 +2,18 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct IntoStream { + #[pin] stream: St, } impl IntoStream { - unsafe_pinned!(stream: St); - #[inline] pub(super) fn new(stream: St) -> Self { IntoStream { stream } @@ -39,8 +39,9 @@ impl IntoStream { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -61,12 +62,13 @@ impl FusedStream for IntoStream { impl Stream for IntoStream { type Item = Result; + #[pin_project(self)] #[inline] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.stream().try_poll_next(cx) + self.stream.try_poll_next(cx) } } diff --git a/futures-util/src/try_stream/map_err.rs b/futures-util/src/try_stream/map_err.rs index faf09e6b59..ef034f0f4d 100644 --- a/futures-util/src/try_stream/map_err.rs +++ b/futures-util/src/try_stream/map_err.rs @@ -2,20 +2,19 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct MapErr { + #[pin] stream: St, f: F, } impl MapErr { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - /// Creates a new MapErr. pub(super) fn new(stream: St, f: F) -> Self { MapErr { stream, f } @@ -41,8 +40,9 @@ impl MapErr { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -54,8 +54,6 @@ impl MapErr { } } -impl Unpin for MapErr {} - impl FusedStream for MapErr { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -69,14 +67,15 @@ where { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.as_mut() - .stream() + self.stream + .as_mut() .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e)))) + .map(|opt| opt.map(|res| res.map_err(|e| (self.f)(e)))) } } diff --git a/futures-util/src/try_stream/map_ok.rs b/futures-util/src/try_stream/map_ok.rs index 02ef729185..58d913472e 100644 --- a/futures-util/src/try_stream/map_ok.rs +++ b/futures-util/src/try_stream/map_ok.rs @@ -2,20 +2,19 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct MapOk { + #[pin] stream: St, f: F, } impl MapOk { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - /// Creates a new MapOk. pub(super) fn new(stream: St, f: F) -> Self { MapOk { stream, f } @@ -41,8 +40,9 @@ impl MapOk { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -54,8 +54,6 @@ impl MapOk { } } -impl Unpin for MapOk {} - impl FusedStream for MapOk { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -69,14 +67,15 @@ where { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.as_mut() - .stream() + self.stream + .as_mut() .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x)))) + .map(|opt| opt.map(|res| res.map(|x| (self.f)(x)))) } } diff --git a/futures-util/src/try_stream/or_else.rs b/futures-util/src/try_stream/or_else.rs index c811722b46..8df4ee9a7f 100644 --- a/futures-util/src/try_stream/or_else.rs +++ b/futures-util/src/try_stream/or_else.rs @@ -3,28 +3,25 @@ use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`or_else`](super::TryStreamExt::or_else) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct OrElse { + #[pin] stream: St, + #[pin] future: Option, f: F, } -impl Unpin for OrElse {} - impl OrElse where St: TryStream, F: FnMut(St::Error) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, future: None, f } } @@ -49,8 +46,9 @@ impl OrElse /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -69,22 +67,23 @@ impl Stream for OrElse { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { + if self.future.is_none() { + let item = match ready!(self.stream.as_mut().try_poll_next(cx)) { None => return Poll::Ready(None), Some(Ok(e)) => return Poll::Ready(Some(Ok(e))), Some(Err(e)) => e, }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); + let fut = (self.f)(item); + self.future.set(Some(fut)); } - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); + let e = ready!(self.future.as_mut().as_pin_mut().unwrap().try_poll(cx)); + self.future.set(None); Poll::Ready(Some(e)) } } diff --git a/futures-util/src/try_stream/try_buffer_unordered.rs b/futures-util/src/try_stream/try_buffer_unordered.rs index f6f096aff0..0f146fdeaf 100644 --- a/futures-util/src/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/try_stream/try_buffer_unordered.rs @@ -5,32 +5,27 @@ use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; use core::pin::Pin; +use pin_project::{pin_project, unsafe_project}; /// Stream for the /// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryBufferUnordered where St: TryStream { + #[pin] stream: Fuse>, in_progress_queue: FuturesUnordered>, max: usize, } -impl Unpin for TryBufferUnordered - where St: TryStream + Unpin -{} - impl TryBufferUnordered where St: TryStream, St::Ok: TryFuture, { - unsafe_pinned!(stream: Fuse>); - unsafe_unpinned!(in_progress_queue: FuturesUnordered>); - pub(super) fn new(stream: St, n: usize) -> Self { TryBufferUnordered { stream: IntoStream::new(stream).fuse(), @@ -59,8 +54,9 @@ impl TryBufferUnordered /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream().get_pin_mut().get_pin_mut() + self.stream.get_pin_mut().get_pin_mut() } /// Consumes this combinator, returning the underlying stream. @@ -78,21 +74,22 @@ impl Stream for TryBufferUnordered { type Item = Result<::Ok, St::Error>; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { // First up, try to spawn off as many futures as possible by filling up // our slab of futures. Propagate errors from the stream immediately. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx)? { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut.into_future()), + while self.in_progress_queue.len() < *self.max { + match self.stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(fut)) => self.in_progress_queue.push(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match Pin::new(self.as_mut().in_progress_queue()).poll_next(cx) { + match Pin::new(self.in_progress_queue).poll_next(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } diff --git a/futures-util/src/try_stream/try_collect.rs b/futures-util/src/try_stream/try_collect.rs index 9d6d04b576..31398a6729 100644 --- a/futures-util/src/try_stream/try_collect.rs +++ b/futures-util/src/try_stream/try_collect.rs @@ -3,34 +3,27 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, TryStream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryCollect { + #[pin] stream: St, items: C, } impl TryCollect { - unsafe_pinned!(stream: St); - unsafe_unpinned!(items: C); - pub(super) fn new(s: St) -> TryCollect { TryCollect { stream: s, items: Default::default(), } } - - fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.items(), Default::default()) - } } -impl Unpin for TryCollect {} - impl FusedFuture for TryCollect { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -42,14 +35,15 @@ impl Future for TryCollect { type Output = Result; + #[pin_project(self)] fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => self.as_mut().items().extend(Some(x)), - None => return Poll::Ready(Ok(self.as_mut().finish())), + match ready!(self.stream.as_mut().try_poll_next(cx)?) { + Some(x) => self.items.extend(Some(x)), + None => return Poll::Ready(Ok(mem::replace(self.items, Default::default()))), } } } diff --git a/futures-util/src/try_stream/try_concat.rs b/futures-util/src/try_stream/try_concat.rs index f4774fe242..a45d62373c 100644 --- a/futures-util/src/try_stream/try_concat.rs +++ b/futures-util/src/try_stream/try_concat.rs @@ -3,26 +3,23 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryConcat { + #[pin] stream: St, accum: Option, } -impl Unpin for TryConcat {} - impl TryConcat where St: TryStream, St::Ok: Extend<::Item> + IntoIterator + Default, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(accum: Option); - pub(super) fn new(stream: St) -> TryConcat { TryConcat { stream, @@ -38,19 +35,19 @@ where { type Output = Result; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { + match ready!(self.stream.as_mut().try_poll_next(cx)?) { Some(x) => { - let accum = self.as_mut().accum(); - if let Some(a) = accum { + if let Some(a) = self.accum { a.extend(x) } else { - *accum = Some(x) + *self.accum = Some(x) } }, None => { - return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default())) + return Poll::Ready(Ok(self.accum.take().unwrap_or_default())) } } } diff --git a/futures-util/src/try_stream/try_filter.rs b/futures-util/src/try_stream/try_filter.rs index 65be194f94..544e425062 100644 --- a/futures-util/src/try_stream/try_filter.rs +++ b/futures-util/src/try_stream/try_filter.rs @@ -3,33 +3,27 @@ use futures_core::future::Future; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`try_filter`](super::TryStreamExt::try_filter) /// method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryFilter where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option, pending_item: Option, } -impl Unpin for TryFilter - where St: TryStream + Unpin, Fut: Unpin, -{} - impl TryFilter where St: TryStream { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option); - unsafe_unpinned!(pending_item: Option); - pub(super) fn new(stream: St, f: F) -> Self { TryFilter { stream, @@ -59,8 +53,9 @@ impl TryFilter /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -89,24 +84,25 @@ impl Stream for TryFilter { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { loop { if self.pending_fut.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { + let item = match ready!(self.stream.as_mut().try_poll_next(cx)?) { Some(x) => x, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); + let fut = (self.f)(&item); + self.pending_fut.set(Some(fut)); + *self.pending_item = Some(item); } - let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); + let yield_item = ready!(self.pending_fut.as_mut().as_pin_mut().unwrap().poll(cx)); + self.pending_fut.set(None); + let item = self.pending_item.take().unwrap(); if yield_item { return Poll::Ready(Some(Ok(item))); diff --git a/futures-util/src/try_stream/try_filter_map.rs b/futures-util/src/try_stream/try_filter_map.rs index ce5e092fd2..9a61cade4e 100644 --- a/futures-util/src/try_stream/try_filter_map.rs +++ b/futures-util/src/try_stream/try_filter_map.rs @@ -3,27 +3,22 @@ use futures_core::future::{TryFuture}; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) /// method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryFilterMap { + #[pin] stream: St, f: F, + #[pin] pending: Option, } -impl Unpin for TryFilterMap - where St: Unpin, Fut: Unpin, -{} - impl TryFilterMap { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending: Option); - pub(super) fn new(stream: St, f: F) -> Self { TryFilterMap { stream, f, pending: None } } @@ -48,8 +43,9 @@ impl TryFilterMap { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -78,22 +74,23 @@ impl Stream for TryFilterMap { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { loop { if self.pending.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { + let item = match ready!(self.stream.as_mut().try_poll_next(cx)?) { Some(x) => x, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(item); - self.as_mut().pending().set(Some(fut)); + let fut = (self.f)(item); + self.pending.set(Some(fut)); } - let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().pending().set(None); + let result = ready!(self.pending.as_mut().as_pin_mut().unwrap().try_poll(cx)); + self.pending.set(None); if let Some(x) = result? { return Poll::Ready(Some(Ok(x))); } diff --git a/futures-util/src/try_stream/try_fold.rs b/futures-util/src/try_stream/try_fold.rs index 6ed91861e2..f0a300a916 100644 --- a/futures-util/src/try_stream/try_fold.rs +++ b/futures-util/src/try_stream/try_fold.rs @@ -2,30 +2,26 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryFold { + #[pin] stream: St, f: F, accum: Option, + #[pin] future: Option, } -impl Unpin for TryFold {} - impl TryFold where St: TryStream, F: FnMut(T, St::Ok) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_unpinned!(accum: Option); - unsafe_pinned!(future: Option); - pub(super) fn new(stream: St, f: F, t: T) -> TryFold { TryFold { stream, @@ -49,40 +45,41 @@ impl Future for TryFold { type Output = Result; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { // we're currently processing a future to produce a new accum value if self.accum.is_none() { let accum = match ready!( - self.as_mut().future().as_pin_mut() + self.future.as_mut().as_pin_mut() .expect("TryFold polled after completion") .try_poll(cx) ) { Ok(accum) => accum, Err(e) => { // Indicate that the future can no longer be polled. - self.as_mut().future().set(None); + self.future.set(None); return Poll::Ready(Err(e)); } }; - *self.as_mut().accum() = Some(accum); - self.as_mut().future().set(None); + *self.accum = Some(accum); + self.future.set(None); } - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { + let item = match ready!(self.stream.as_mut().try_poll_next(cx)) { Some(Ok(item)) => Some(item), Some(Err(e)) => { // Indicate that the future can no longer be polled. - *self.as_mut().accum() = None; + *self.accum = None; return Poll::Ready(Err(e)); } None => None, }; - let accum = self.as_mut().accum().take().unwrap(); + let accum = self.accum.take().unwrap(); if let Some(e) = item { - let future = (self.as_mut().f())(accum, e); - self.as_mut().future().set(Some(future)); + let future = (self.f)(accum, e); + self.future.set(Some(future)); } else { return Poll::Ready(Ok(accum)) } diff --git a/futures-util/src/try_stream/try_for_each.rs b/futures-util/src/try_stream/try_for_each.rs index c30dad19f8..af2d5c830c 100644 --- a/futures-util/src/try_stream/try_for_each.rs +++ b/futures-util/src/try_stream/try_for_each.rs @@ -2,28 +2,25 @@ use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEach { + #[pin] stream: St, f: F, + #[pin] future: Option, } -impl Unpin for TryForEach {} - impl TryForEach where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(future: Option); - pub(super) fn new(stream: St, f: F) -> TryForEach { TryForEach { stream, @@ -40,17 +37,18 @@ impl Future for TryForEach { type Output = Result<(), St::Error>; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - if let Some(future) = self.as_mut().future().as_pin_mut() { + if let Some(future) = self.future.as_mut().as_pin_mut() { ready!(future.try_poll(cx))?; } - self.as_mut().future().set(None); + self.future.set(None); - match ready!(self.as_mut().stream().try_poll_next(cx)?) { + match ready!(self.stream.as_mut().try_poll_next(cx)?) { Some(e) => { - let future = (self.as_mut().f())(e); - self.as_mut().future().set(Some(future)); + let future = (self.f)(e); + self.future.set(Some(future)); } None => return Poll::Ready(Ok(())), } diff --git a/futures-util/src/try_stream/try_for_each_concurrent.rs b/futures-util/src/try_stream/try_for_each_concurrent.rs index 237905780d..bb6ef5add2 100644 --- a/futures-util/src/try_stream/try_for_each_concurrent.rs +++ b/futures-util/src/try_stream/try_for_each_concurrent.rs @@ -5,25 +5,22 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Future for the /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) /// method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEachConcurrent { + #[pin] stream: Option, f: F, futures: FuturesUnordered, limit: Option, } -impl Unpin for TryForEachConcurrent -where St: Unpin, - Fut: Unpin, -{} - impl FusedFuture for TryForEachConcurrent { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -35,11 +32,6 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: Future>, { - unsafe_pinned!(stream: Option); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered); - unsafe_unpinned!(limit: Option); - pub(super) fn new(stream: St, limit: Option, f: F) -> TryForEachConcurrent { TryForEachConcurrent { stream: Some(stream), @@ -58,6 +50,7 @@ impl Future for TryForEachConcurrent { type Output = Result<(), St::Error>; + #[pin_project(self)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { let mut made_progress_this_iter = false; @@ -66,7 +59,7 @@ impl Future for TryForEachConcurrent let current_len = self.futures.len(); // Check if we've already created a number of futures greater than `limit` if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { - let poll_res = match self.as_mut().stream().as_pin_mut() { + let poll_res = match self.stream.as_mut().as_pin_mut() { Some(stream) => stream.try_poll_next(cx), None => Poll::Ready(None), }; @@ -77,26 +70,26 @@ impl Future for TryForEachConcurrent Some(elem) }, Poll::Ready(None) => { - self.as_mut().stream().set(None); + self.stream.set(None); None } Poll::Pending => None, Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + self.stream.set(None); + drop(mem::replace(self.futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } }; if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + let next_future = (self.f)(elem); + self.futures.push(next_future); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match self.futures.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { if self.stream.is_none() { @@ -107,8 +100,8 @@ impl Future for TryForEachConcurrent Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + self.stream.set(None); + drop(mem::replace(self.futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } } diff --git a/futures-util/src/try_stream/try_skip_while.rs b/futures-util/src/try_stream/try_skip_while.rs index 7d509e06b6..b97ca8091c 100644 --- a/futures-util/src/try_stream/try_skip_while.rs +++ b/futures-util/src/try_stream/try_skip_while.rs @@ -3,33 +3,28 @@ use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, unsafe_project}; /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) /// method. +#[unsafe_project(Unpin)] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TrySkipWhile where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option, pending_item: Option, done_skipping: bool, } -impl Unpin for TrySkipWhile {} - impl TrySkipWhile where St: TryStream, F: FnMut(&St::Ok) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option); - unsafe_unpinned!(pending_item: Option); - unsafe_unpinned!(done_skipping: bool); - pub(super) fn new(stream: St, f: F) -> TrySkipWhile { TrySkipWhile { stream, @@ -60,8 +55,9 @@ impl TrySkipWhile /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. + #[pin_project(self)] pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { - self.stream() + self.stream } /// Consumes this combinator, returning the underlying stream. @@ -80,31 +76,32 @@ impl Stream for TrySkipWhile { type Item = Result; + #[pin_project(self)] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if self.done_skipping { - return self.as_mut().stream().try_poll_next(cx); + if *self.done_skipping { + return self.stream.try_poll_next(cx); } loop { if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { + let item = match ready!(self.stream.as_mut().try_poll_next(cx)?) { Some(e) => e, None => return Poll::Ready(None), }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); + let fut = (self.f)(&item); + self.pending_fut.set(Some(fut)); + *self.pending_item = Some(item); } - let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().try_poll(cx)?); - let item = self.as_mut().pending_item().take().unwrap(); - self.as_mut().pending_fut().set(None); + let skipped = ready!(self.pending_fut.as_mut().as_pin_mut().unwrap().try_poll(cx)?); + let item = self.pending_item.take().unwrap(); + self.pending_fut.set(None); if !skipped { - *self.as_mut().done_skipping() = true; + *self.done_skipping = true; return Poll::Ready(Some(Ok(item))) } }