Skip to content

Commit

Permalink
Use ready!/?/map more
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and Nemo157 committed May 23, 2019
1 parent 9e80141 commit 1eebcbe
Show file tree
Hide file tree
Showing 25 changed files with 110 additions and 179 deletions.
5 changes: 1 addition & 4 deletions futures-util/benches_disabled/bilock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ impl Stream for LockStream {
type Error = ();

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> {
self.lock.poll(cx).map(|a| match a {
Poll::Ready(a) => Poll::Ready(Some(a)),
Poll::Pending => Poll::Pending,
})
self.lock.poll(cx).map(|a| a.map(Some))
}
}

Expand Down
53 changes: 23 additions & 30 deletions futures-util/src/compat/compat01as03.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ pub trait Sink01CompatExt: Sink01 {
impl<Si: Sink01> Sink01CompatExt for Si {}

fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
match x {
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
Ok(Async01::NotReady) => task03::Poll::Pending,
Err(e) => task03::Poll::Ready(Err(e)),
match x? {
Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
Async01::NotReady => task03::Poll::Pending,
}
}

Expand All @@ -158,11 +157,10 @@ impl<St: Stream01> Stream03 for Compat01As03<St> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> task03::Poll<Option<Self::Item>> {
match self.in_notify(cx, Stream01::poll) {
Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))),
Ok(Async01::Ready(None)) => task03::Poll::Ready(None),
Ok(Async01::NotReady) => task03::Poll::Pending,
Err(e) => task03::Poll::Ready(Some(Err(e))),
match self.in_notify(cx, Stream01::poll)? {
Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
Async01::Ready(None) => task03::Poll::Ready(None),
Async01::NotReady => task03::Poll::Pending,
}
}
}
Expand Down Expand Up @@ -213,11 +211,10 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> task03::Poll<Option<Self::Item>> {
match self.in_notify(cx, Stream01::poll) {
Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))),
Ok(Async01::Ready(None)) => task03::Poll::Ready(None),
Ok(Async01::NotReady) => task03::Poll::Pending,
Err(e) => task03::Poll::Ready(Some(Err(e))),
match self.in_notify(cx, Stream01::poll)? {
Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
Async01::Ready(None) => task03::Poll::Ready(None),
Async01::NotReady => task03::Poll::Pending,
}
}
}
Expand All @@ -242,13 +239,12 @@ where
cx: &mut Context<'_>,
) -> task03::Poll<Result<(), Self::SinkError>> {
match self.buffer.take() {
Some(item) => match self.in_notify(cx, |f| f.start_send(item)) {
Ok(AsyncSink01::Ready) => task03::Poll::Ready(Ok(())),
Ok(AsyncSink01::NotReady(i)) => {
Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
AsyncSink01::NotReady(i) => {
self.buffer = Some(i);
task03::Poll::Pending
}
Err(e) => task03::Poll::Ready(Err(e)),
},
None => task03::Poll::Ready(Ok(())),
}
Expand All @@ -260,21 +256,19 @@ where
) -> task03::Poll<Result<(), Self::SinkError>> {
let item = self.buffer.take();
match self.in_notify(cx, |f| match item {
Some(i) => match f.start_send(i) {
Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)),
Ok(AsyncSink01::NotReady(t)) => {
Some(i) => match f.start_send(i)? {
AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
AsyncSink01::NotReady(t) => {
Ok((Async01::NotReady, Some(t)))
}
Err(e) => Err(e),
},
None => f.poll_complete().map(|i| (i, None)),
}) {
Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())),
Ok((Async01::NotReady, item)) => {
})? {
(Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
(Async01::NotReady, item) => {
self.buffer = item;
task03::Poll::Pending
}
Err(e) => task03::Poll::Ready(Err(e)),
}
}

Expand All @@ -301,14 +295,13 @@ where
Ok((<S as Sink01>::close(f)?, None, true))
});

match result {
Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())),
Ok((Async01::NotReady, item, close_started)) => {
match result? {
(Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
(Async01::NotReady, item, close_started) => {
self.buffer = item;
self.close_started = close_started;
task03::Poll::Pending
}
Err(e) => task03::Poll::Ready(Err(e)),
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions futures-util/src/compat/compat03as01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ impl<T, Item> CompatSink<T, Item> {
fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>)
-> Result<Async01<T>, E>
{
match x {
task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)),
match x? {
task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
task03::Poll::Pending => Ok(Async01::NotReady),
task03::Poll::Ready(Err(e)) => Err(e),
}
}

Expand All @@ -103,11 +102,10 @@ where
type Error = St::Error;

fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
with_context(self, |inner, cx| match inner.try_poll_next(cx) {
with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
task03::Poll::Ready(Some(Ok(t))) => Ok(Async01::Ready(Some(t))),
task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
task03::Poll::Pending => Ok(Async01::NotReady),
task03::Poll::Ready(Some(Err(e))) => Err(e),
})
}
}
Expand All @@ -124,12 +122,11 @@ where
item: Self::SinkItem,
) -> StartSend01<Self::SinkItem, Self::SinkError> {
with_sink_context(self, |mut inner, cx| {
match inner.as_mut().poll_ready(cx) {
task03::Poll::Ready(Ok(())) => {
match inner.as_mut().poll_ready(cx)? {
task03::Poll::Ready(()) => {
inner.start_send(item).map(|()| AsyncSink01::Ready)
}
task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
task03::Poll::Ready(Err(e)) => Err(e),
}
})
}
Expand Down
6 changes: 2 additions & 4 deletions futures-util/src/future/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data>
loop {
let (output, data) = match this {
Chain::First(fut1, data) => {
match unsafe { Pin::new_unchecked(fut1) }.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(output) => (output, data.take().unwrap()),
}
let output = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx));
(output, data.take().unwrap())
}
Chain::Second(fut2) => {
return unsafe { Pin::new_unchecked(fut2) }.poll(cx);
Expand Down
9 changes: 1 addition & 8 deletions futures-util/src/future/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,8 @@ impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
// safety: we use this &mut only for matching, not for movement
let v = match self.as_mut().future().as_pin_mut() {
Some(fut) => {
// safety: this re-pinned future will never move before being dropped
match fut.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(v) => v
}
}
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Pending,
};

Expand Down
6 changes: 1 addition & 5 deletions futures-util/src/future/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ impl<Fut, F> Future for Inspect<Fut, F>
type Output = Fut::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
let e = match self.as_mut().future().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(e) => e,
};

let e = ready!(self.as_mut().future().poll(cx));
let f = self.as_mut().f().take().expect("cannot poll Inspect twice");
f(&e);
Poll::Ready(e)
Expand Down
7 changes: 1 addition & 6 deletions futures-util/src/future/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ impl<Fut: Future> Stream for IntoStream<Fut> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let v = match self.as_mut().future().as_pin_mut() {
Some(fut) => {
match fut.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(v) => v
}
}
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};

Expand Down
12 changes: 6 additions & 6 deletions futures-util/src/future/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ impl<Fut, F, T> Future for Map<Fut, F>
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
match self.as_mut().future().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => {
self.as_mut()
.future()
.poll(cx)
.map(|output| {
let f = self.f().take()
.expect("Map must not be polled after it returned `Poll::Ready`");
Poll::Ready(f(output))
}
}
f(output)
})
}
}
8 changes: 1 addition & 7 deletions futures-util/src/future/maybe_done.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,7 @@ impl<Fut: Future> Future for MaybeDone<Fut> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = unsafe {
match Pin::get_unchecked_mut(self.as_mut()) {
MaybeDone::Future(a) => {
if let Poll::Ready(res) = Pin::new_unchecked(a).poll(cx) {
res
} else {
return Poll::Pending
}
}
MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
MaybeDone::Done(_) => return Poll::Ready(()),
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
Expand Down
14 changes: 5 additions & 9 deletions futures-util/src/future/remote_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ impl<T: Send + 'static> Future for RemoteHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
match self.rx.poll_unpin(cx) {
Poll::Ready(Ok(Ok(output))) => Poll::Ready(output),
Poll::Ready(Ok(Err(e))) => panic::resume_unwind(e),
Poll::Ready(Err(e)) => panic::resume_unwind(Box::new(e)),
Poll::Pending => Poll::Pending,
match ready!(self.rx.poll_unpin(cx)) {
Ok(Ok(output)) => Poll::Ready(output),
Ok(Err(e)) => panic::resume_unwind(e),
Err(e) => panic::resume_unwind(Box::new(e)),
}
}
}
Expand Down Expand Up @@ -88,10 +87,7 @@ impl<Fut: Future> Future for Remote<Fut> {
}
}

let output = match self.as_mut().future().poll(cx) {
Poll::Ready(output) => output,
Poll::Pending => return Poll::Pending,
};
let output = ready!(self.as_mut().future().poll(cx));

// if the receiving end has gone away then that's ok, we just ignore the
// send error here.
Expand Down
9 changes: 4 additions & 5 deletions futures-util/src/io/read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@ fn read_to_end_internal<R: AsyncRead + ?Sized>(
}
}

match rd.as_mut().poll_read(cx, &mut g.buf[g.len..]) {
Poll::Ready(Ok(0)) => {
match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(()));
break;
}
Poll::Ready(Ok(n)) => g.len += n,
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
Expand Down
8 changes: 2 additions & 6 deletions futures-util/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ fn lock_and_then<T, U, E, F>(
) -> Poll<Result<U, E>>
where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>>
{
match lock.poll_lock(cx) {
// Safety: the value behind the bilock used by `ReadHalf` and `WriteHalf` is never exposed
// as a `Pin<&mut T>` anywhere other than here as a way to get to `&mut T`.
Poll::Ready(mut l) => f(l.as_pin_mut(), cx),
Poll::Pending => Poll::Pending,
}
let mut l = ready!(lock.poll_lock(cx));
f(l.as_pin_mut(), cx)
}

pub(super) fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
Expand Down
7 changes: 3 additions & 4 deletions futures-util/src/stream/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ where St: Stream,
mut self: Pin<&mut Self>, cx: &mut Context<'_>
) -> Poll<Self::Output> {
loop {
match self.as_mut().stream().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
match ready!(self.as_mut().stream().poll_next(cx)) {
None => {
return Poll::Ready(self.as_mut().accum().take().unwrap_or_default())
}
Poll::Ready(Some(e)) => {
Some(e) => {
let accum = self.as_mut().accum();
if let Some(a) = accum {
a.extend(e)
Expand Down
9 changes: 4 additions & 5 deletions futures-util/src/stream/futures_ordered.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::FuturesUnordered;
use crate::stream::{FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
Expand Down Expand Up @@ -168,17 +168,16 @@ impl<Fut: Future> Stream for FuturesOrdered<Fut> {
}

loop {
match Pin::new(&mut this.in_progress_queue).poll_next(cx) {
Poll::Ready(Some(output)) => {
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
Some(output) => {
if output.index == this.next_outgoing_index {
this.next_outgoing_index += 1;
return Poll::Ready(Some(output.data));
} else {
this.queued_outputs.push(output)
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
None => return Poll::Ready(None),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/stream/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ impl<St, F, T> Stream for Map<St, F>
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
let option = ready!(self.as_mut().stream().poll_next(cx));
Poll::Ready(option.map(self.as_mut().f()))
self.as_mut()
.stream()
.poll_next(cx)
.map(|opt| opt.map(|x| self.as_mut().f()(x)))
}
}

Expand Down
3 changes: 2 additions & 1 deletion futures-util/src/stream/next.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::stream::StreamExt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
Expand Down Expand Up @@ -31,6 +32,6 @@ impl<St: Stream + Unpin> Future for Next<'_, St> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
self.stream.poll_next_unpin(cx)
}
}
Loading

0 comments on commit 1eebcbe

Please sign in to comment.