Skip to content

Commit

Permalink
Use pin-project instead of unsafe_[un]pinned
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jun 29, 2019
1 parent 6f16a1b commit 5290f66
Show file tree
Hide file tree
Showing 69 changed files with 644 additions and 776 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ members = [
"futures-util",
"futures-test",
]

[patch.crates-io]
pin-project = { git = "https://github.com/taiki-e/pin-project", branch = "pin_project" }
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ slab = { version = "0.4", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-project = "0.3.2"
pin-utils = "0.1.0-alpha.4"

[dev-dependencies]
Expand Down
11 changes: 6 additions & 5 deletions futures-util/src/future/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};
use std::any::Any;
use std::pin::Pin;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
use std::pin::Pin;

/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut> where Fut: Future {
#[pin]
future: Fut,
}

impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
CatchUnwind { future }
}
Expand All @@ -25,7 +25,8 @@ impl<Fut> Future for CatchUnwind<Fut>
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;

#[pin_project(self)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
catch_unwind(AssertUnwindSafe(|| self.future.poll(cx)))?.map(Ok)
}
}
9 changes: 5 additions & 4 deletions futures-util/src/future/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Future for the [`flatten`](super::FutureExt::flatten) method.
#[unsafe_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flatten<Fut>
where Fut: Future,
Fut::Output: Future,
{
#[pin]
state: Chain<Fut, Fut::Output, ()>,
}

impl<Fut> Flatten<Fut>
where Fut: Future,
Fut::Output: Future,
{
unsafe_pinned!(state: Chain<Fut, Fut::Output, ()>);

pub(super) fn new(future: Fut) -> Flatten<Fut> {
Flatten {
state: Chain::new(future, ()),
Expand Down Expand Up @@ -51,7 +51,8 @@ impl<Fut> Future for Flatten<Fut>
{
type Output = <Fut::Output as Future>::Output;

#[pin_project(self)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.state().poll(cx, |a, ()| a)
self.state.poll(cx, |a, ()| a)
}
}
11 changes: 6 additions & 5 deletions futures-util/src/future/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Future for the [`fuse`](super::FutureExt::fuse) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Fuse<Fut: Future> {
#[pin]
future: Option<Fut>,
}

impl<Fut: Future> Fuse<Fut> {
unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(f: Fut) -> Fuse<Fut> {
Fuse {
future: Some(f),
Expand Down Expand Up @@ -79,13 +79,14 @@ impl<Fut: Future> FusedFuture for Fuse<Fut> {
impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
let v = match self.as_mut().future().as_pin_mut() {
let v = match self.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Pending,
};

self.as_mut().future().set(None);
self.future.set(None);
Poll::Ready(v)
}
}
14 changes: 6 additions & 8 deletions futures-util/src/future/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use pin_project::{pin_project, unsafe_project};

/// Future for the [`inspect`](super::FutureExt::inspect) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Inspect<Fut, F> where Fut: Future {
#[pin]
future: Fut,
f: Option<F>,
}

impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(f: Option<F>);

pub(super) fn new(future: Fut, f: F) -> Inspect<Fut, F> {
Inspect {
future,
Expand All @@ -23,8 +22,6 @@ impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {
}
}

impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {}

impl<Fut: Future + FusedFuture, F> FusedFuture for Inspect<Fut, F> {
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}
Expand All @@ -35,9 +32,10 @@ impl<Fut, F> Future for Inspect<Fut, F>
{
type Output = Fut::Output;

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
let e = ready!(self.as_mut().future().poll(cx));
let f = self.as_mut().f().take().expect("cannot poll Inspect twice");
let e = ready!(self.future.as_mut().poll(cx));
let f = self.f.take().expect("cannot poll Inspect twice");
f(&e);
Poll::Ready(e)
}
Expand Down
11 changes: 6 additions & 5 deletions futures-util/src/future/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Stream for the [`into_stream`](super::FutureExt::into_stream) method.
#[unsafe_project(Unpin)]
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<Fut: Future> {
#[pin]
future: Option<Fut>
}

impl<Fut: Future> IntoStream<Fut> {
unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(future: Fut) -> IntoStream<Fut> {
IntoStream {
future: Some(future)
Expand All @@ -24,13 +24,14 @@ impl<Fut: Future> IntoStream<Fut> {
impl<Fut: Future> Stream for IntoStream<Fut> {
type Item = Fut::Output;

#[pin_project(self)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let v = match self.as_mut().future().as_pin_mut() {
let v = match self.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};

self.as_mut().future().set(None);
self.future.set(None);
Poll::Ready(Some(v))
}
}
17 changes: 7 additions & 10 deletions futures-util/src/future/map.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use pin_project::{pin_project, unsafe_project};

/// Future for the [`map`](super::FutureExt::map) method.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Map<Fut, F> {
#[pin]
future: Fut,
f: Option<F>,
}

impl<Fut, F> Map<Fut, F> {
unsafe_pinned!(future: Fut);
unsafe_unpinned!(f: Option<F>);

/// Creates a new Map.
pub(super) fn new(future: Fut, f: F) -> Map<Fut, F> {
Map { future, f: Some(f) }
}
}

impl<Fut: Unpin, F> Unpin for Map<Fut, F> {}

impl<Fut, F> FusedFuture for Map<Fut, F> {
fn is_terminated(&self) -> bool { self.f.is_none() }
}
Expand All @@ -33,12 +29,13 @@ impl<Fut, F, T> Future for Map<Fut, F>
{
type Output = T;

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
self.as_mut()
.future()
self.future
.as_mut()
.poll(cx)
.map(|output| {
let f = self.f().take()
let f = self.f.take()
.expect("Map must not be polled after it returned `Poll::Ready`");
f(output)
})
Expand Down
11 changes: 5 additions & 6 deletions futures-util/src/future/never_error.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{self, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// Future for the [`never_error`](super::FutureExt::never_error) combinator.
#[unsafe_project(Unpin)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NeverError<Fut> {
#[pin]
future: Fut,
}

impl<Fut> NeverError<Fut> {
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> NeverError<Fut> {
NeverError { future }
}
}

impl<Fut: Unpin> Unpin for NeverError<Fut> {}

impl<Fut: FusedFuture> FusedFuture for NeverError<Fut> {
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}
Expand All @@ -29,7 +27,8 @@ impl<Fut, T> Future for NeverError<Fut>
{
type Output = Result<T, !>;

#[pin_project(self)]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<T, !>> {
self.future().poll(cx).map(Ok)
self.future.poll(cx).map(Ok)
}
}
11 changes: 5 additions & 6 deletions futures-util/src/future/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use pin_project::{pin_project, unsafe_project};

/// A future representing a value which may or may not be present.
///
Expand All @@ -23,24 +23,23 @@ use pin_utils::unsafe_pinned;
/// assert_eq!(a.await, None);
/// # });
/// ```
#[unsafe_project(Unpin)]
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct OptionFuture<F> {
#[pin]
option: Option<F>,
}

impl<F> OptionFuture<F> {
unsafe_pinned!(option: Option<F>);
}

impl<F: Future> Future for OptionFuture<F> {
type Output = Option<F::Output>;

#[pin_project(self)]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
match self.option().as_pin_mut() {
match self.option.as_pin_mut() {
Some(x) => x.poll(cx).map(Some),
None => Poll::Ready(None),
}
Expand Down
18 changes: 7 additions & 11 deletions futures-util/src/future/remote_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
future::Future,
task::{Context, Poll},
},
pin_utils::{unsafe_pinned, unsafe_unpinned},
pin_project::{pin_project, unsafe_project},
std::{
any::Any,
fmt,
Expand Down Expand Up @@ -54,10 +54,12 @@ type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'stati

/// A future which sends its output to the corresponding `RemoteHandle`.
/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
#[unsafe_project(Unpin)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Remote<Fut: Future> {
tx: Option<Sender<SendMsg<Fut>>>,
keep_running: Arc<AtomicBool>,
#[pin]
future: CatchUnwind<AssertUnwindSafe<Fut>>,
}

Expand All @@ -69,29 +71,23 @@ impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
}
}

impl<Fut: Future + Unpin> Unpin for Remote<Fut> {}

impl<Fut: Future> Remote<Fut> {
unsafe_pinned!(future: CatchUnwind<AssertUnwindSafe<Fut>>);
unsafe_unpinned!(tx: Option<Sender<SendMsg<Fut>>>);
}

impl<Fut: Future> Future for Remote<Fut> {
type Output = ();

#[pin_project(self)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(_) = self.as_mut().tx().as_mut().unwrap().poll_cancel(cx) {
if let Poll::Ready(_) = self.tx.as_mut().unwrap().poll_cancel(cx) {
if !self.keep_running.load(Ordering::SeqCst) {
// Cancelled, bail out
return Poll::Ready(())
}
}

let output = ready!(self.as_mut().future().poll(cx));
let output = ready!(self.future.as_mut().poll(cx));

// if the receiving end has gone away then that's ok, we just ignore the
// send error here.
drop(self.as_mut().tx().take().unwrap().send(output));
drop(self.tx.take().unwrap().send(output));
Poll::Ready(())
}
}
Expand Down
Loading

0 comments on commit 5290f66

Please sign in to comment.