Skip to content

Commit

Permalink
tokio-stream: add StreamExt::map_while (#4351)
Browse files Browse the repository at this point in the history
Fixes #4337

Rust 1.57 stabilized the `Iterator::map_while` API. This PR adds the
same functionality to the `StreamExt` trait, to keep parity.
  • Loading branch information
BraulioVM authored Dec 31, 2021
1 parent 43cdb2c commit fb35c83
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
45 changes: 45 additions & 0 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use fuse::Fuse;
mod map;
use map::Map;

mod map_while;
use map_while::MapWhile;

mod merge;
use merge::Merge;

Expand Down Expand Up @@ -201,6 +204,48 @@ pub trait StreamExt: Stream {
Map::new(self, f)
}

/// Map this stream's items to a different type for as long as determined by
/// the provided closure. A stream of the target type will be returned,
/// which will yield elements until the closure returns `None`.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available, until it returns `None`. It is executed inline
/// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
/// the underlying stream will not be polled again.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the [`Iterator::map_while`] method in the
/// standard library.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
///
/// let stream = stream::iter(1..=10);
/// let mut stream = stream.map_while(|x| {
/// if x < 4 {
/// Some(x + 3)
/// } else {
/// None
/// }
/// });
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// assert_eq!(stream.next().await, None);
/// # }
/// ```
fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
where
F: FnMut(Self::Item) -> Option<T>,
Self: Sized,
{
MapWhile::new(self, f)
}

/// Maps this stream's items asynchronously to a different type, returning a
/// new stream of the resulting type.
///
Expand Down
52 changes: 52 additions & 0 deletions tokio-stream/src/stream_ext/map_while.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::Stream;

use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`map_while`](super::StreamExt::map_while) method.
#[must_use = "streams do nothing unless polled"]
pub struct MapWhile<St, F> {
#[pin]
stream: St,
f: F,
}
}

impl<St, F> fmt::Debug for MapWhile<St, F>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MapWhile")
.field("stream", &self.stream)
.finish()
}
}

impl<St, F> MapWhile<St, F> {
pub(super) fn new(stream: St, f: F) -> Self {
MapWhile { stream, f }
}
}

impl<St, F, T> Stream for MapWhile<St, F>
where
St: Stream,
F: FnMut(St::Item) -> Option<T>,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let me = self.project();
let f = me.f;
me.stream.poll_next(cx).map(|opt| opt.and_then(f))
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (_, upper) = self.stream.size_hint();
(0, upper)
}
}

0 comments on commit fb35c83

Please sign in to comment.